Spark MLLib¶
MLLib Pipeline¶
Generally, use of MLLIb for supervised and unsupervised learning follow some or all of the stages in the following template:
- Get data
- Pre-process the data
- Convert data to a form that MLLib functions require (*)
- Build a model
- Optimize and fit the model to the data
- Post-processing and model evaluation
This is often assembled as a pipeline for convenience and
reproducibility. This is very similar to what you would do with
sklearn
, except that MLLib allows you to handle massive datasets by
distributing the analysis to multiple computers.
Set up Spark and Spark SQL contexts¶
In [1]:
%%spark
Starting Spark application
SparkSession available as 'spark'.
Spark MLLib imports¶
The older mllib
package works on RDDs. The newer ml
package
works on DataFrames. We will show examples using both, but it is more
convenient to use the ml
package.
In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import GaussianMixture
Unsupervised Learning¶
We saw this machine learning problem previously with sklearn
, where
the task is to distinguish rocks from mines using 60 sonar numerical
features. We will illustrate some of the mechanics of how to work with
MLLib - this is not intended to be a serious attempt at modeling the
data.
Obtain data¶
NAME: Sonar, Mines vs. Rocks
SUMMARY: This is the data set used by Gorman and Sejnowski in their study of the classification of sonar signals using a neural network [1]. The task is to train a network to discriminate between sonar signals bounced off a metal cylinder and those bounced off a roughly cylindrical rock.
SOURCE: The data set was contributed to the benchmark collection by Terry Sejnowski, now at the Salk Institute and the University of California at San Deigo. The data set was developed in collaboration with R. Paul Gorman of Allied-Signal Aerospace Technology Center.
See description
In [3]:
import pandas as pd
In [4]:
url = 'https://astro.temple.edu/~alan/sonar_all-data.txt'
data = pd.read_csv(url, header=None)
In [5]:
data.shape
(208, 61)
In [6]:
print(data.iloc[:3, :5])
0 1 2 3 4
0 0.0200 0.0371 0.0428 0.0207 0.0954
1 0.0453 0.0523 0.0843 0.0689 0.1183
2 0.0262 0.0582 0.1099 0.1083 0.0974
In [7]:
print(data.iloc[:3, -5:])
56 57 58 59 60
0 0.0180 0.0084 0.0090 0.0032 R
1 0.0140 0.0049 0.0052 0.0044 R
2 0.0316 0.0164 0.0095 0.0078 R
In [8]:
cols = ['C%02d' % i for i in range(60)] + ['raw_label']
df = spark.createDataFrame(data, cols)
In [9]:
df.printSchema()
root
|-- C00: double (nullable = true)
|-- C01: double (nullable = true)
|-- C02: double (nullable = true)
|-- C03: double (nullable = true)
|-- C04: double (nullable = true)
|-- C05: double (nullable = true)
|-- C06: double (nullable = true)
|-- C07: double (nullable = true)
|-- C08: double (nullable = true)
|-- C09: double (nullable = true)
|-- C10: double (nullable = true)
|-- C11: double (nullable = true)
|-- C12: double (nullable = true)
|-- C13: double (nullable = true)
|-- C14: double (nullable = true)
|-- C15: double (nullable = true)
|-- C16: double (nullable = true)
|-- C17: double (nullable = true)
|-- C18: double (nullable = true)
|-- C19: double (nullable = true)
|-- C20: double (nullable = true)
|-- C21: double (nullable = true)
|-- C22: double (nullable = true)
|-- C23: double (nullable = true)
|-- C24: double (nullable = true)
|-- C25: double (nullable = true)
|-- C26: double (nullable = true)
|-- C27: double (nullable = true)
|-- C28: double (nullable = true)
|-- C29: double (nullable = true)
|-- C30: double (nullable = true)
|-- C31: double (nullable = true)
|-- C32: double (nullable = true)
|-- C33: double (nullable = true)
|-- C34: double (nullable = true)
|-- C35: double (nullable = true)
|-- C36: double (nullable = true)
|-- C37: double (nullable = true)
|-- C38: double (nullable = true)
|-- C39: double (nullable = true)
|-- C40: double (nullable = true)
|-- C41: double (nullable = true)
|-- C42: double (nullable = true)
|-- C43: double (nullable = true)
|-- C44: double (nullable = true)
|-- C45: double (nullable = true)
|-- C46: double (nullable = true)
|-- C47: double (nullable = true)
|-- C48: double (nullable = true)
|-- C49: double (nullable = true)
|-- C50: double (nullable = true)
|-- C51: double (nullable = true)
|-- C52: double (nullable = true)
|-- C53: double (nullable = true)
|-- C54: double (nullable = true)
|-- C55: double (nullable = true)
|-- C56: double (nullable = true)
|-- C57: double (nullable = true)
|-- C58: double (nullable = true)
|-- C59: double (nullable = true)
|-- raw_label: string (nullable = true)
Pre-process the data¶
Transform 60 features into MMlib vectors
In [10]:
assembler = VectorAssembler(
inputCols=['C%02d' % i for i in range(60)],
outputCol="raw_features")
output = assembler.transform(df)
In [11]:
output.show(3)
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
| C00| C01| C02| C03| C04| C05| C06| C07| C08| C09| C10| C11| C12| C13| C14| C15| C16| C17| C18| C19| C20| C21| C22| C23| C24| C25| C26| C27| C28| C29| C30| C31| C32| C33| C34| C35| C36| C37| C38| C39| C40| C41| C42| C43| C44| C45| C46| C47| C48| C49| C50| C51| C52| C53| C54| C55| C56| C57| C58| C59|raw_label| raw_features|
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
| 0.02|0.0371|0.0428|0.0207|0.0954|0.0986|0.1539|0.1601|0.3109|0.2111|0.1609|0.1582|0.2238|0.0645| 0.066|0.2273| 0.31|0.2999|0.5078|0.4797|0.5783|0.5071|0.4328| 0.555|0.6711|0.6415|0.7104| 0.808|0.6791|0.3857|0.1307|0.2604|0.5121|0.7547|0.8537|0.8507|0.6692|0.6097|0.4943|0.2744| 0.051|0.2834|0.2825|0.4256|0.2641|0.1386|0.1051|0.1343|0.0383|0.0324|0.0232|0.0027|0.0065|0.0159|0.0072|0.0167| 0.018|0.0084| 0.009|0.0032| R|[0.02,0.0371,0.04...|
|0.0453|0.0523|0.0843|0.0689|0.1183|0.2583|0.2156|0.3481|0.3337|0.2872|0.4918|0.6552|0.6919|0.7797|0.7464|0.9444| 1.0|0.8874|0.8024|0.7818|0.5212|0.4052|0.3957|0.3914| 0.325| 0.32|0.3271|0.2767|0.4423|0.2028|0.3788|0.2947|0.1984|0.2341|0.1306|0.4182|0.3835|0.1057| 0.184| 0.197|0.1674|0.0583|0.1401|0.1628|0.0621|0.0203| 0.053|0.0742|0.0409|0.0061|0.0125|0.0084|0.0089|0.0048|0.0094|0.0191| 0.014|0.0049|0.0052|0.0044| R|[0.0453,0.0523,0....|
|0.0262|0.0582|0.1099|0.1083|0.0974| 0.228|0.2431|0.3771|0.5598|0.6194|0.6333| 0.706|0.5544| 0.532|0.6479|0.6931|0.6759|0.7551|0.8929|0.8619|0.7974|0.6737|0.4293|0.3648|0.5331|0.2413| 0.507|0.8533|0.6036|0.8514|0.8512|0.5045|0.1862|0.2709|0.4232|0.3043|0.6116|0.6756|0.5375|0.4719|0.4647|0.2587|0.2129|0.2222|0.2111|0.0176|0.1348|0.0744| 0.013|0.0106|0.0033|0.0232|0.0166|0.0095| 0.018|0.0244|0.0316|0.0164|0.0095|0.0078| R|[0.0262,0.0582,0....|
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
only showing top 3 rows
Scale features to have zero mean and unit standard deviation
In [12]:
standardizer = StandardScaler(withMean=True, withStd=True,
inputCol='raw_features',
outputCol='features')
model = standardizer.fit(output)
output = model.transform(output)
Convert label to numeric index
In [13]:
indexer = StringIndexer(inputCol="raw_label", outputCol="label")
indexed = indexer.fit(output).transform(output)
Extract only columns of interest
In [14]:
sonar = indexed.select(['features', 'label'])
In [15]:
sonar.show(n=3)
+--------------------+-----+
| features|label|
+--------------------+-----+
|[-0.3985897356694...| 1.0|
|[0.70184498705605...| 1.0|
|[-0.1289179854363...| 1.0|
+--------------------+-----+
only showing top 3 rows
Data conversion¶
We will first fit a Gaussian Mixture Model with 2 components to the first 2 principal components of the data as an example of unsupervised learning.
In [16]:
import numpy as np
In [17]:
pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(sonar)
transformed = model.transform(sonar)
In [18]:
transformed.show(3)
+--------------------+-----+--------------------+
| features|label| pca|
+--------------------+-----+--------------------+
|[-0.3985897356694...| 1.0|[-1.9165444107164...|
|[0.70184498705605...| 1.0|[0.47896904316843...|
|[-0.1289179854363...| 1.0|[-3.8499400285258...|
+--------------------+-----+--------------------+
only showing top 3 rows
In [19]:
features = transformed.select('pca')
In [20]:
features = transformed.select('pca').rdd.map(lambda x: np.array(x))
In [21]:
features.take(3)
[array([[-1.91654441, 1.36759373]]), array([[ 0.47896904, -7.56812953]]), array([[-3.84994003, -6.42436107]])]
Build Model¶
In [22]:
gmm = GaussianMixture(k=2, seed=123, featuresCol='pca')
In [23]:
model = gmm.fit(transformed)
In [24]:
transformed2 = model.transform(transformed)
In [25]:
transformed2.show(4)
+--------------------+-----+--------------------+----------+--------------------+
| features|label| pca|prediction| probability|
+--------------------+-----+--------------------+----------+--------------------+
|[-0.3985897356694...| 1.0|[-1.9165444107164...| 0|[0.62298055982439...|
|[0.70184498705605...| 1.0|[0.47896904316843...| 0|[0.99997018397790...|
|[-0.1289179854363...| 1.0|[-3.8499400285258...| 0|[0.83185871352854...|
|[-0.8335441715294...| 1.0|[-4.5863546250792...| 1|[0.00590393467266...|
+--------------------+-----+--------------------+----------+--------------------+
only showing top 4 rows
Get fitted Guassian parameters as DataFrame¶
In [26]:
model.gaussiansDF.show(truncate=False)
+---------------------------------------+---------------------------------------------------------------------------------+
|mean |cov |
+---------------------------------------+---------------------------------------------------------------------------------+
|[1.45630530064182,-0.37037164792437044]|3.615993244446481 1.5178686905624637
1.5178686905624637 10.969615585723204 |
|[-5.357055209406797,1.362421303455668] |7.039461019406999 3.6991498391254107
3.6991498391254107 10.078828243534241 |
+---------------------------------------+---------------------------------------------------------------------------------+
Supervised Learning¶
We will fit a logistic regression model to the data as an example of supervised learning.
In [27]:
sonar.show(n=3)
+--------------------+-----+
| features|label|
+--------------------+-----+
|[-0.3985897356694...| 1.0|
|[0.70184498705605...| 1.0|
|[-0.1289179854363...| 1.0|
+--------------------+-----+
only showing top 3 rows
Using ml
for logistic regression¶
Convert to format expected by regression functions in mllib
In [28]:
sonar.printSchema()
root
|-- features: vector (nullable = true)
|-- label: double (nullable = true)
In [29]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
Split into test and train sets
In [30]:
train, test = sonar.randomSplit([0.7, 0.3])
Fit model to training data
In [31]:
train.show(4)
+--------------------+-----+
| features|label|
+--------------------+-----+
|[-1.0988663774039...| 1.0|
|[-0.9727295910045...| 1.0|
|[-0.9248846030599...| 1.0|
|[-0.8770396151153...| 1.0|
+--------------------+-----+
only showing top 4 rows
In [32]:
model = lr.fit(train)
Evaluate on test data
In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
In [34]:
predictions = model.transform(test)
In [35]:
predictions.show(3)
+--------------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[-0.9031368812669...| 1.0|[-3.9291177051083...|[0.01928190985745...| 1.0|
|[-0.8726900707567...| 1.0|[-4.1830133029604...|[0.01502333483134...| 1.0|
|[-0.8335441715294...| 1.0|[0.47133208472102...|[0.61569899448073...| 0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
In [36]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
In [37]:
evaluator.getMetricName()
'areaUnderROC'
In [38]:
evaluator.evaluate(predictions)
0.8369905956112853
Using the ml
pipeline¶
We build a pipeline to preoprcess and fit a logistic regression model to the original DataFrame. The pipeline stages consist of
- Convert featrue columns in DataFrame into a vector of features
- Scele features to have zero mean and unit standard deviation
- Convert string labels into numeric labels
- Reduce dimensionality using PCA with the first 5 PCs
- Run logistic regression to predict the labels from the feature vector of 5 Principal Components
In [39]:
df.show(1)
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
| C00| C01| C02| C03| C04| C05| C06| C07| C08| C09| C10| C11| C12| C13| C14| C15| C16| C17| C18| C19| C20| C21| C22| C23| C24| C25| C26| C27| C28| C29| C30| C31| C32| C33| C34| C35| C36| C37| C38| C39| C40| C41| C42| C43| C44| C45| C46| C47| C48| C49| C50| C51| C52| C53| C54| C55| C56| C57| C58| C59|raw_label|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
|0.02|0.0371|0.0428|0.0207|0.0954|0.0986|0.1539|0.1601|0.3109|0.2111|0.1609|0.1582|0.2238|0.0645|0.066|0.2273|0.31|0.2999|0.5078|0.4797|0.5783|0.5071|0.4328|0.555|0.6711|0.6415|0.7104|0.808|0.6791|0.3857|0.1307|0.2604|0.5121|0.7547|0.8537|0.8507|0.6692|0.6097|0.4943|0.2744|0.051|0.2834|0.2825|0.4256|0.2641|0.1386|0.1051|0.1343|0.0383|0.0324|0.0232|0.0027|0.0065|0.0159|0.0072|0.0167|0.018|0.0084|0.009|0.0032| R|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
only showing top 1 row
In [40]:
transformer = VectorAssembler(
inputCols=['C%02d' % i for i in range(60)],
outputCol="raw_features"
)
standardizer = StandardScaler(
withMean=True,
withStd=True,
inputCol='raw_features',
outputCol='features'
)
indexer = StringIndexer(
inputCol="raw_label",
outputCol="label"
)
pca = PCA(
k=5,
inputCol="features",
outputCol="pca"
)
lr = LogisticRegression(
featuresCol='features',
labelCol='label'
)
pipeline = Pipeline(stages=[transformer, standardizer, indexer, pca, lr])
In [41]:
train, test = df.randomSplit([0.7, 0.3])
In [42]:
model = pipeline.fit(train)
In [43]:
import warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore')
prediction = model.transform(test)
In [44]:
score = prediction.select(['label', 'prediction'])
score.show(n=score.count())
+-----+----------+
|label|prediction|
+-----+----------+
| 1.0| 1.0|
| 1.0| 0.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 0.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 0.0| 1.0|
| 1.0| 0.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 1.0|
| 1.0| 0.0|
| 1.0| 1.0|
| 0.0| 0.0|
| 1.0| 1.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 1.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
| 0.0| 0.0|
+-----+----------+
Evaluate accuracy¶
In [45]:
acc = score.rdd.map(lambda x: x[0] == x[1]).sum() / float(score.count())
acc
0.7796610169491526
In [46]:
spark.stop()