Spark MLLib

  • Official documentation: The official documentation is clear, detailed and includes many code examples. You should refer to the official docs for exploration of this rich and rapidly growing library.

MLLib Pipeline

Generally, use of MLLIb for supervised and unsupervised learning follow some or all of the stages in the following template:

  • Get data
  • Pre-process the data
  • Convert data to a form that MLLib functions require (*)
  • Build a model
  • Optimize and fit the model to the data
  • Post-processing and model evaluation

This is often assembled as a pipeline for convenience and reproducibility. This is very similar to what you would do with sklearn, except that MLLib allows you to handle massive datasets by distributing the analysis to multiple computers.

Set up Spark and Spark SQL contexts

from pyspark import SparkContext
sc = SparkContext('local[*]')
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

Spark MLLib imports

The older mllib package works on RDDs. The newer ml package works on DataFrames. We will show examples using both, but it is more convenient to use the ml package.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import GaussianMixture
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

Unsupervised Learning

We saw this machine learning problem previously with sklearn, where the task is to distinguish rocks from mines using 60 sonar numerical features. We will illustrate some of the mechanics of how to work with MLLib - this is not intended to be a serious attempt at modeling the data.

Obtain data

df = (sqlc.read.format('com.databricks.spark.csv')
      .options(header='false', inferschema='true')
      .load('data/sonar.all-data.txt'))
df.printSchema()
root
 |-- C0: double (nullable = true)
 |-- C1: double (nullable = true)
 |-- C2: double (nullable = true)
 |-- C3: double (nullable = true)
 |-- C4: double (nullable = true)
 |-- C5: double (nullable = true)
 |-- C6: double (nullable = true)
 |-- C7: double (nullable = true)
 |-- C8: double (nullable = true)
 |-- C9: double (nullable = true)
 |-- C10: double (nullable = true)
 |-- C11: double (nullable = true)
 |-- C12: double (nullable = true)
 |-- C13: double (nullable = true)
 |-- C14: double (nullable = true)
 |-- C15: double (nullable = true)
 |-- C16: double (nullable = true)
 |-- C17: double (nullable = true)
 |-- C18: double (nullable = true)
 |-- C19: double (nullable = true)
 |-- C20: double (nullable = true)
 |-- C21: double (nullable = true)
 |-- C22: double (nullable = true)
 |-- C23: double (nullable = true)
 |-- C24: double (nullable = true)
 |-- C25: double (nullable = true)
 |-- C26: double (nullable = true)
 |-- C27: double (nullable = true)
 |-- C28: double (nullable = true)
 |-- C29: double (nullable = true)
 |-- C30: double (nullable = true)
 |-- C31: double (nullable = true)
 |-- C32: double (nullable = true)
 |-- C33: double (nullable = true)
 |-- C34: double (nullable = true)
 |-- C35: double (nullable = true)
 |-- C36: double (nullable = true)
 |-- C37: double (nullable = true)
 |-- C38: double (nullable = true)
 |-- C39: double (nullable = true)
 |-- C40: double (nullable = true)
 |-- C41: double (nullable = true)
 |-- C42: double (nullable = true)
 |-- C43: double (nullable = true)
 |-- C44: double (nullable = true)
 |-- C45: double (nullable = true)
 |-- C46: double (nullable = true)
 |-- C47: double (nullable = true)
 |-- C48: double (nullable = true)
 |-- C49: double (nullable = true)
 |-- C50: double (nullable = true)
 |-- C51: double (nullable = true)
 |-- C52: double (nullable = true)
 |-- C53: double (nullable = true)
 |-- C54: double (nullable = true)
 |-- C55: double (nullable = true)
 |-- C56: double (nullable = true)
 |-- C57: double (nullable = true)
 |-- C58: double (nullable = true)
 |-- C59: double (nullable = true)
 |-- C60: string (nullable = true)

Pre-process the data

df = df.withColumnRenamed("C60","label")

Transform 60 features into MMlib vectors

assembler = VectorAssembler(
    inputCols=['C%d' % i for i in range(60)],
    outputCol="features")
output = assembler.transform(df)

Scale features to have zero mean and unit standard deviation

standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='std_features')
model = standardizer.fit(output)
output = model.transform(output)

Convert label to numeric index

indexer = StringIndexer(inputCol="label", outputCol="label_idx")
indexed = indexer.fit(output).transform(output)

Extract only columns of interest

sonar = indexed.select(['std_features', 'label', 'label_idx'])
sonar.show(n=3)
+--------------------+-----+---------+
|        std_features|label|label_idx|
+--------------------+-----+---------+
|[-0.3985897356694...|    R|      1.0|
|[0.70184498705605...|    R|      1.0|
|[-0.1289179854363...|    R|      1.0|
+--------------------+-----+---------+
only showing top 3 rows

Data conversion

We will first fit a Gaussian Mixture Model with 2 components to the first 2 principal components of the data as an example of unsupervised learning. The GaussianMixture model requires an RDD of vectors, not a DataFrame. Note that pyspark converts numpy arrays to Spark vectors.

pca = PCA(k=2, inputCol="std_features", outputCol="pca")
model = pca.fit(sonar)
transformed = model.transform(sonar)
features = transformed.select('pca').rdd.map(lambda x: np.array(x))
features.take(3)
[array([[-1.91654441,  1.36759373]]),
 array([[ 0.47896904, -7.56812953]]),
 array([[-3.84994003, -6.42436107]])]

Build Model

gmm = GaussianMixture.train(features, k=2)

Optimize and fit the model to data

Note that we are looking at optimistic in-sample errors.

predict = gmm.predict(features).collect()
labels = sonar.select('label_idx').rdd.map(lambda r: r[0]).collect()

Post-processing and model evaluation

The GMM is poor at clustering rocks and mines based on the first 2 PC of the sonographic data.

np.corrcoef(predict, labels)
array([[ 1.        , -0.13825324],
       [-0.13825324,  1.        ]])

Plot discrepancy between predicted and labels

xs = np.array(features.collect()).squeeze()

fig, axes = plt.subplots(1, 2, figsize=(10, 4))
axes[0].scatter(xs[:, 0], xs[:,1], c=predict)
axes[0].set_title('Predicted')
axes[1].scatter(xs[:, 0], xs[:,1], c=labels)
axes[1].set_title('Labels')
pass
_images/18D_Spark_MLib_36_0.png

Supervised Learning

We will fit a logistic regression model to the data as an example of supervised learning.

sonar.show(n=3)
+--------------------+-----+---------+
|        std_features|label|label_idx|
+--------------------+-----+---------+
|[-0.3985897356694...|    R|      1.0|
|[0.70184498705605...|    R|      1.0|
|[-0.1289179854363...|    R|      1.0|
+--------------------+-----+---------+
only showing top 3 rows

Using mllib and RDDs

Convert to format expected by regression functions in mllib

data = sonar.map(lambda x: LabeledPoint(x[2], x[0]))

Split into test and train sets

train, test = data.randomSplit([0.7, 0.3])

Fit model to training data

model = LogisticRegressionWithLBFGS.train(train)

Evaluate on test data

y_yhat = test.map(lambda x: (x.label, model.predict(x.features)))
err = y_yhat.filter(lambda x: x[0] != x[1]).count() / float(test.count())
print("Error = " + str(err))
Error = 0.26229508196721313

Using the newer ml pipeline

transformer = VectorAssembler(inputCols=['C%d' % i for i in range(60)],
                              outputCol="features")
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='std_features')
indexer = StringIndexer(inputCol="C60", outputCol="label_idx")
pca = PCA(k=5, inputCol="std_features", outputCol="pca")
lr = LogisticRegression(featuresCol='std_features', labelCol='label_idx')

pipeline = Pipeline(stages=[transformer, standardizer, indexer, pca, lr])
df = (sqlc.read.format('com.databricks.spark.csv')
      .options(header='false', inferschema='true')
      .load('data/sonar.all-data.txt'))
train, test = df.randomSplit([0.7, 0.3])
model = pipeline.fit(train)
import warnings

with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    prediction = model.transform(test)
score = prediction.select(['label_idx', 'prediction'])
score.show(n=score.count())
+---------+----------+
|label_idx|prediction|
+---------+----------+
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       1.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       1.0|
|      0.0|       0.0|
|      0.0|       1.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       1.0|
|      0.0|       0.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      0.0|       0.0|
|      0.0|       1.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      0.0|       0.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       0.0|
|      1.0|       0.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
|      1.0|       1.0|
+---------+----------+
acc = score.map(lambda x: x[0] == x[1]).sum() / score.count()
acc
0.765625

Spark MLLIb and sklearn integration

There is a package that you can install with

pip install spark-sklearn

Basically, it provides the same API as sklearn but uses Spark MLLib under the hood to perform the actual computations in a distributed way (passed in via the SparkContext instance).

Example taken directly from package website

Install if necessary

! pip install spark_sklearn
from sklearn import svm, grid_search, datasets
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC()
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)
GridSearchCV(cv=None, error_score='raise',
       estimator=SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape=None, degree=3, gamma='auto', kernel='rbf',
  max_iter=-1, probability=False, random_state=None, shrinking=True,
  tol=0.001, verbose=False),
       fit_params={}, iid=True, n_jobs=1,
       param_grid={'kernel': ('linear', 'rbf'), 'C': [1, 10]},
       pre_dispatch='2*n_jobs', refit=True,
       sc=<pyspark.context.SparkContext object at 0x10435ef60>,
       scoring=None, verbose=0)