Spark MLLib

The MLLib library has two packages - pyspark.mllib which provides an RDD interface, and pyspark.ml which provides a DataFrame interface. we will focus only on the high-level interface.

Topics:

  • Preparing a DataFrame for ML

  • Generating simple statistics

  • Splitting data

  • Preprocessing

    • Feature extraction

    • Imputation

    • Transforms

  • Unsupervised learning

  • Supervised learning

    • Hyperparameter optimization

    • Using R formula

    • Evaluation

  • Using pipelines

Set up Spark and Spark SQL contexts

[4]:
from pyspark.sql import SparkSession
[5]:
spark = (
    SparkSession.builder
    .master("local")
    .appName("BIOS-823")
    .config("spark.executor.cores", 4)
    .getOrCreate()
)

Preparing a DataFrame for ML

Spark provides dense and sparse vectors and matrices mainly as data structures for ML, so even though they live in the ml.linalg module, the ability to manipulate them is very limited.

[6]:
import numpy as np
[7]:
from pyspark.ml.linalg import Vectors

Vectors

[8]:
v1 = Vectors.dense(range(1,11))
v2 = Vectors.dense(range(0, 10))
[9]:
v1
[9]:
DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0])
[10]:
v2
[10]:
DenseVector([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0])
[11]:
v1.array
[11]:
array([ 1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10.])
[12]:
v1.norm(p=1)
[12]:
55.0
[13]:
v1.sum()
[13]:
55.0
[14]:
v1.norm(p=2)
[14]:
19.621416870348583
[15]:
np.sqrt((v1*v1).sum())
[15]:
19.621416870348583
[16]:
v1.squared_distance(v2)
[16]:
10.0
[17]:
((v1 - v2).array**2).sum()
[17]:
10.0
[18]:
v1.dot(v2)
[18]:
330.0

Manual construction of an ML DaataFrame

ML operations generally require a DataFrame with columns corresponding to a a label and features represented as a Vector.

[19]:
data =[
    (0.0, Vectors.dense([1,2,3])),
    (1.0, Vectors.dense([2,3,4])),
    (0.0, Vectors.dense([3,4,5])),
    (1.0, Vectors.dense([2,2,2]))
]

df = spark.createDataFrame(data, ['label', 'features'])
[20]:
df.show()
+-----+-------------+
|label|     features|
+-----+-------------+
|  0.0|[1.0,2.0,3.0]|
|  1.0|[2.0,3.0,4.0]|
|  0.0|[3.0,4.0,5.0]|
|  1.0|[2.0,2.0,2.0]|
+-----+-------------+

[21]:
from pyspark.ml.functions import vector_to_array
[22]:
from pyspark.sql.functions import col
[23]:
df.select('label', vector_to_array('features')).show()
+-----+---------------+
|label|  UDF(features)|
+-----+---------------+
|  0.0|[1.0, 2.0, 3.0]|
|  1.0|[2.0, 3.0, 4.0]|
|  0.0|[3.0, 4.0, 5.0]|
|  1.0|[2.0, 2.0, 2.0]|
+-----+---------------+

[24]:
df.withColumn("xs", vector_to_array("features")).show()
+-----+-------------+---------------+
|label|     features|             xs|
+-----+-------------+---------------+
|  0.0|[1.0,2.0,3.0]|[1.0, 2.0, 3.0]|
|  1.0|[2.0,3.0,4.0]|[2.0, 3.0, 4.0]|
|  0.0|[3.0,4.0,5.0]|[3.0, 4.0, 5.0]|
|  1.0|[2.0,2.0,2.0]|[2.0, 2.0, 2.0]|
+-----+-------------+---------------+

[25]:
df.withColumn("xs", vector_to_array("features")).dtypes
[25]:
[('label', 'double'), ('features', 'vector'), ('xs', 'array<double>')]
[26]:
(
    df.
    withColumn("xs", vector_to_array("features")).
    select(["label"] + [col("xs")[i] for i in range(3)])
).show()
+-----+-----+-----+-----+
|label|xs[0]|xs[1]|xs[2]|
+-----+-----+-----+-----+
|  0.0|  1.0|  2.0|  3.0|
|  1.0|  2.0|  3.0|  4.0|
|  0.0|  3.0|  4.0|  5.0|
|  1.0|  2.0|  2.0|  2.0|
+-----+-----+-----+-----+

Using VectorAssembler

[27]:
from pyspark.ml.feature import VectorAssembler
[28]:
import pandas as pd
[29]:
url = 'https://bit.ly/3eoBK6t'
pdf = pd.read_csv(url)
[30]:
pdf.head(3)
[30]:
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
[31]:
iris = spark.createDataFrame(pdf)
[32]:
iris.show(3)
+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 3 rows

[33]:
iris.columns[:-1]
[33]:
['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
[34]:
assembler = VectorAssembler(
    inputCols=iris.columns[:-1],
    outputCol='raw_features')
df = assembler.transform(iris)
[35]:
df.show(3)
+------------+-----------+------------+-----------+-------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species|     raw_features|
+------------+-----------+------------+-----------+-------+-----------------+
|         5.1|        3.5|         1.4|        0.2| setosa|[5.1,3.5,1.4,0.2]|
|         4.9|        3.0|         1.4|        0.2| setosa|[4.9,3.0,1.4,0.2]|
|         4.7|        3.2|         1.3|        0.2| setosa|[4.7,3.2,1.3,0.2]|
+------------+-----------+------------+-----------+-------+-----------------+
only showing top 3 rows

Generating simple statistics

[36]:
import pyspark.ml.stat as stat
[37]:
df.select(stat.Summarizer.mean(col('raw_features'))).show()
+--------------------+
|  mean(raw_features)|
+--------------------+
|[5.84333333333333...|
+--------------------+

[38]:
(
    df.select(
        stat.Summarizer.metrics('std', 'min', 'max', 'mean').
        summary(col('raw_features'))).
    show(truncate=False)
)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|aggregate_metrics(raw_features, 1.0)                                                                                                                                                               |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[0.8280661279778628,0.43359431136217363,1.7644204199522624,0.7631607417008414], [4.3,2.0,1.0,0.1], [7.9,4.4,6.9,2.5], [5.843333333333335,3.0540000000000003,3.758666666666667,1.1986666666666668]]|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Split data

[39]:
train_data, test_data = df.randomSplit(weights=[2.,1.], seed=0)
[40]:
train_data.count(), test_data.count()
[40]:
(98, 52)
[41]:
train_data.sample(0.1).show()
+------------+-----------+------------+-----------+----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|     raw_features|
+------------+-----------+------------+-----------+----------+-----------------+
|         5.0|        3.4|         1.5|        0.2|    setosa|[5.0,3.4,1.5,0.2]|
|         5.4|        3.7|         1.5|        0.2|    setosa|[5.4,3.7,1.5,0.2]|
|         4.8|        3.0|         1.4|        0.1|    setosa|[4.8,3.0,1.4,0.1]|
|         4.8|        3.4|         1.9|        0.2|    setosa|[4.8,3.4,1.9,0.2]|
|         4.9|        3.1|         1.5|        0.1|    setosa|[4.9,3.1,1.5,0.1]|
|         5.2|        4.1|         1.5|        0.1|    setosa|[5.2,4.1,1.5,0.1]|
|         5.4|        3.4|         1.5|        0.4|    setosa|[5.4,3.4,1.5,0.4]|
|         5.6|        3.0|         4.5|        1.5|versicolor|[5.6,3.0,4.5,1.5]|
|         5.8|        2.7|         4.1|        1.0|versicolor|[5.8,2.7,4.1,1.0]|
|         6.1|        2.9|         4.7|        1.4|versicolor|[6.1,2.9,4.7,1.4]|
|         5.0|        2.3|         3.3|        1.0|versicolor|[5.0,2.3,3.3,1.0]|
|         5.5|        2.6|         4.4|        1.2|versicolor|[5.5,2.6,4.4,1.2]|
|         5.7|        3.0|         4.2|        1.2|versicolor|[5.7,3.0,4.2,1.2]|
|         6.4|        2.7|         5.3|        1.9| virginica|[6.4,2.7,5.3,1.9]|
|         6.5|        3.2|         5.1|        2.0| virginica|[6.5,3.2,5.1,2.0]|
|         6.7|        3.3|         5.7|        2.1| virginica|[6.7,3.3,5.7,2.1]|
|         6.4|        3.1|         5.5|        1.8| virginica|[6.4,3.1,5.5,1.8]|
|         6.9|        3.1|         5.1|        2.3| virginica|[6.9,3.1,5.1,2.3]|
+------------+-----------+------------+-----------+----------+-----------------+

[42]:
test_data.sample(0.1).show()
+------------+-----------+------------+-----------+----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|     raw_features|
+------------+-----------+------------+-----------+----------+-----------------+
|         5.0|        3.6|         1.4|        0.2|    setosa|[5.0,3.6,1.4,0.2]|
|         5.2|        3.5|         1.5|        0.2|    setosa|[5.2,3.5,1.5,0.2]|
|         4.9|        2.4|         3.3|        1.0|versicolor|[4.9,2.4,3.3,1.0]|
|         6.3|        2.9|         5.6|        1.8| virginica|[6.3,2.9,5.6,1.8]|
|         7.2|        3.2|         6.0|        1.8| virginica|[7.2,3.2,6.0,1.8]|
|         6.7|        3.3|         5.7|        2.5| virginica|[6.7,3.3,5.7,2.5]|
+------------+-----------+------------+-----------+----------+-----------------+

[43]:
train_data = train_data.persist()

Processing data

Encoding categorical features

[44]:
from pyspark.ml.feature import StringIndexer
[45]:
indexer = StringIndexer(inputCol='species', outputCol='label')
[46]:
indexer = indexer.fit(train_data)
[47]:
train_df = indexer.transform(train_data)
test_df = indexer.transform(test_data)
[48]:
train_df.sample(0.1).show()
+------------+-----------+------------+-----------+----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|   species|     raw_features|label|
+------------+-----------+------------+-----------+----------+-----------------+-----+
|         4.6|        3.4|         1.4|        0.3|    setosa|[4.6,3.4,1.4,0.3]|  0.0|
|         4.9|        3.0|         1.4|        0.2|    setosa|[4.9,3.0,1.4,0.2]|  0.0|
|         4.7|        3.2|         1.6|        0.2|    setosa|[4.7,3.2,1.6,0.2]|  0.0|
|         6.6|        2.9|         4.6|        1.3|versicolor|[6.6,2.9,4.6,1.3]|  1.0|
|         5.6|        2.5|         3.9|        1.1|versicolor|[5.6,2.5,3.9,1.1]|  1.0|
|         5.9|        3.0|         4.2|        1.5|versicolor|[5.9,3.0,4.2,1.5]|  1.0|
|         6.2|        2.2|         4.5|        1.5|versicolor|[6.2,2.2,4.5,1.5]|  1.0|
|         6.7|        3.1|         4.4|        1.4|versicolor|[6.7,3.1,4.4,1.4]|  1.0|
|         6.4|        2.9|         4.3|        1.3|versicolor|[6.4,2.9,4.3,1.3]|  1.0|
|         5.5|        2.6|         4.4|        1.2|versicolor|[5.5,2.6,4.4,1.2]|  1.0|
|         6.2|        3.4|         5.4|        2.3| virginica|[6.2,3.4,5.4,2.3]|  2.0|
|         6.9|        3.1|         5.1|        2.3| virginica|[6.9,3.1,5.1,2.3]|  2.0|
+------------+-----------+------------+-----------+----------+-----------------+-----+

Scaling

[49]:
from pyspark.ml.feature import StandardScaler
[50]:
scaler = StandardScaler(withMean=True,
                        withStd=True,
                        inputCol='raw_features',
                        outputCol='features')
[51]:
scaler = scaler.fit(train_df)
[52]:
train_df_scaled = scaler.transform(train_df)
test_df_scaled = scaler.transform(test_df)
[53]:
train_df_scaled.select('features', 'features').show(5, truncate=False)
+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------+
|features                                                                         |features                                                                         |
+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------+
|[-1.5213158492369616,0.00692241917037384,-1.2132855899947563,-1.22045281288949]  |[-1.5213158492369616,0.00692241917037384,-1.2132855899947563,-1.22045281288949]  |
|[-1.5213158492369616,0.6853194978669278,-1.2714283713245784,-1.0882932817699649] |[-1.5213158492369616,0.6853194978669278,-1.2714283713245784,-1.0882932817699649] |
|[-1.390075573510533,0.23305477873589214,-1.3295711526544005,-1.22045281288949]   |[-1.390075573510533,0.23305477873589214,-1.3295711526544005,-1.22045281288949]   |
|[-1.1275950220576765,-0.21920994039514446,-1.2714283713245784,-1.22045281288949] |[-1.1275950220576765,-0.21920994039514446,-1.2714283713245784,-1.22045281288949] |
|[-1.1275950220576765,0.00692241917037384,-1.2132855899947563,-1.3526123440090148]|[-1.1275950220576765,0.00692241917037384,-1.2132855899947563,-1.3526123440090148]|
+---------------------------------------------------------------------------------+---------------------------------------------------------------------------------+
only showing top 5 rows

Unsupervised learning

Dimension reduction

[54]:
from pyspark.ml.feature import PCA
[55]:
pca = PCA(k=2, inputCol='features', outputCol='pc').fit(train_df_scaled)
[56]:
train_df_scaled = pca.transform(train_df_scaled)
[57]:
train_df_scaled.select('pc').show(5, truncate=False)
+----------------------------------------+
|pc                                      |
+----------------------------------------+
|[2.1719561815281754,0.7342240484007609] |
|[2.3257684348824283,0.11499107410380768]|
|[2.2379887825335114,0.47537636776842684]|
|[1.9410238356438345,0.7700780683206433] |
|[2.0469675488421304,0.5739261085291683] |
+----------------------------------------+
only showing top 5 rows

[58]:
import matplotlib.pyplot as plt
[59]:
pdf = (
    train_df_scaled.select('label', 'pc').
    withColumn("pc", vector_to_array("pc")).
    select(["label"] + [col("pc")[i] for i in range(2)])
).toPandas()
[60]:
pdf.head(3)
[60]:
label pc[0] pc[1]
0 0.0 2.171956 0.734224
1 0.0 2.325768 0.114991
2 0.0 2.237989 0.475376
[61]:
plt.scatter(pdf.iloc[:, 1], pdf.iloc[:, 2], c=pdf.iloc[:, 0])
plt.xlabel('PC1')
plt.ylabel('PC2')
plt.tight_layout()
../_images/notebooks_C07_Spark_ML_73_0.png

Clustering

[62]:
from pyspark.ml.clustering import GaussianMixture
[63]:
gmm = GaussianMixture(
    featuresCol='features',
    k=3,
    seed=0)
[64]:
gmm = gmm.fit(train_df_scaled)
[65]:
gmm.gaussiansDF.toPandas()
[65]:
mean cov
0 [-0.9380257346418976, 0.8046671327476248, -1.2... DenseMatrix([[0.2166823 , 0.29970786, 0.013235...
1 [0.005938775919945091, -0.5221004264901148, 0.... DenseMatrix([[ 0.10356088, -0.0849359 , 0.056...
2 [0.6061006081018865, -0.4609677242002642, 0.72... DenseMatrix([[0.65443637, 0.40398667, 0.255851...
[66]:
train_df_scaled = gmm.transform(train_df_scaled)
[67]:
train_df_scaled.select('features', 'label', 'probability', 'prediction').sample(0.1).show()
+--------------------+-----+--------------------+----------+
|            features|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|[-1.5213158492369...|  0.0|[0.99999999999278...|         0|
|[-0.2089130919726...|  1.0|[5.51104734455841...|         1|
|[-0.0776728162462...|  1.0|[6.94499110009014...|         1|
|[0.70976883811231...|  2.0|[9.24177181915992...|         2|
|[2.02217159537659...|  2.0|[2.49896507749226...|         2|
|[0.31604801093303...|  2.0|[6.91948378303032...|         2|
+--------------------+-----+--------------------+----------+

Supervised Learning

[68]:
train_df_scaled = train_df_scaled.select('features', 'label')
[69]:
test_df_scaled = test_df_scaled.select('features', 'label')
[70]:
from pyspark.ml.classification import RandomForestClassifier
[71]:
rf = RandomForestClassifier(
    numTrees=10,
    maxDepth=3,
    seed=0
)
[72]:
rf = rf.fit(train_df_scaled)
[73]:
test_df_scaled = rf.transform(test_df_scaled)
[74]:
test_df_scaled.sample(0.1).show()
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.31604801093303...|  1.0|[0.03030303030303...|[0.00303030303030...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+

Model evaluation

[75]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
[76]:
evaluator = MulticlassClassificationEvaluator()
[77]:
evaluator.evaluate(test_df_scaled)
[77]:
0.9618662587412588
[78]:
evaluator = evaluator.setMetricName('accuracy')
[79]:
evaluator.evaluate(test_df_scaled)
[79]:
0.9615384615384616

Pipelines

We can assemble pipeline for convenience and reproducibility. This is very similar to what you would do with sklearn, except that MLLib allows you to handle massive datasets by distributing the analysis to multiple computers.

We will put all the preceding steps into the pipeline, and also add hyperparameter optimization via cross-validation.

[80]:
from pyspark.ml import Pipeline
[81]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
[82]:
assembler = VectorAssembler(
    inputCols=iris.columns[:-1],
    outputCol='raw_features'
)
scaler = StandardScaler(
    withMean=True,
    withStd=True,
    inputCol='raw_features',
    outputCol='features'
)
indexer = StringIndexer(
    inputCol="species",
    outputCol="label"
)
rf = RandomForestClassifier()
params = (
    ParamGridBuilder().
    addGrid(rf.numTrees, [5,10,25]).
    addGrid(rf.maxDepth, [3,4,5]).
    build()
)
evaluator = MulticlassClassificationEvaluator(
    metricName='accuracy'
)
cv = CrossValidator(
    estimator=rf,
    evaluator=evaluator,
    estimatorParamMaps=params,
    numFolds=3,
    seed=0
).setParallelism(4)
[83]:
pipeline = Pipeline(
    stages=[assembler, scaler, indexer, cv]
)
[84]:
train_iris, test_iris = iris.randomSplit([2.0, 1.0], seed=0)
[85]:
model = pipeline.fit(train_iris)
[86]:
model.stages
[86]:
[VectorAssembler_b707c6dd0daa,
 StandardScalerModel: uid=StandardScaler_6f905091c141, numFeatures=4, withMean=true, withStd=true,
 StringIndexerModel: uid=StringIndexer_680263d99f3c, handleInvalid=error,
 CrossValidatorModel_a1e2fa6e9d8f]
[87]:
df_cv = pd.DataFrame(list(zip(model.stages[3].getEstimatorParamMaps(), model.stages[3].avgMetrics)))
df_cv[['numTrees', 'maxDepth']] = df_cv[0].apply(lambda x: pd.Series(list(x.values())))
df_cv = df_cv.drop(0, axis=1)
df_cv = df_cv.iloc[:, [1,2,0]]
df_cv.columns = df_cv.columns[:2].tolist() + ['accuracy']
df_cv.sort_values('accuracy', ascending=False)
[87]:
numTrees maxDepth accuracy
6 25 3 0.958970
7 25 4 0.958970
8 25 5 0.958970
5 10 5 0.948217
2 5 5 0.941426
0 5 3 0.940685
3 10 3 0.936723
4 10 4 0.936723
1 5 4 0.929932
[88]:
prediction = model.transform(test_iris)
[89]:
evaluator.evaluate(prediction)
[89]:
0.9807692307692307

Using Spark for non-MLLib models

We can also run non-MLLIB machine learning algorithms using Spark.

Hyper-parameter optimization

[90]:
! python3 -m pip install --quiet joblibspark
[91]:
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.utils import parallel_backend
from joblibspark import register_spark
[92]:
url = 'https://bit.ly/3eoBK6t'
iris = pd.read_csv(url)

X = iris.iloc[:, :-1]
y = iris.species.astype('category').cat.codes

X_train, X_test, y_train, y_test = train_test_split(X, y)
[93]:
svc = SVC(random_state=0)
params = dict(
    C = [0.1, 1, 10],
    kernel = ['linear', 'poly', 'rbf', 'sigmoid']
)
cv = GridSearchCV(svc, params, cv=3)
[94]:
register_spark()
[95]:
with parallel_backend('spark', n_jobs=4):
    cv.fit(X_train, y_train)
[96]:
pd.DataFrame(cv.cv_results_)
[96]:
mean_fit_time std_fit_time mean_score_time std_score_time param_C param_kernel params split0_test_score split1_test_score split2_test_score mean_test_score std_test_score rank_test_score
0 0.003228 0.000145 0.002065 0.000645 0.1 linear {'C': 0.1, 'kernel': 'linear'} 0.973684 0.972973 0.972973 0.973210 0.000335 3
1 0.003249 0.000082 0.001860 0.000418 0.1 poly {'C': 0.1, 'kernel': 'poly'} 0.973684 0.972973 0.972973 0.973210 0.000335 3
2 0.003456 0.000053 0.001981 0.000424 0.1 rbf {'C': 0.1, 'kernel': 'rbf'} 0.763158 0.864865 0.756757 0.794927 0.049523 9
3 0.003865 0.000290 0.002149 0.000506 0.1 sigmoid {'C': 0.1, 'kernel': 'sigmoid'} 0.368421 0.351351 0.351351 0.357041 0.008047 10
4 0.003543 0.000460 0.002302 0.000091 1 linear {'C': 1, 'kernel': 'linear'} 1.000000 0.972973 0.972973 0.981982 0.012741 1
5 0.003509 0.000452 0.002630 0.000460 1 poly {'C': 1, 'kernel': 'poly'} 0.947368 0.945946 0.972973 0.955429 0.012419 6
6 0.003940 0.000466 0.002706 0.000336 1 rbf {'C': 1, 'kernel': 'rbf'} 0.973684 0.972973 0.972973 0.973210 0.000335 3
7 0.004211 0.000627 0.003025 0.000451 1 sigmoid {'C': 1, 'kernel': 'sigmoid'} 0.368421 0.351351 0.351351 0.357041 0.008047 10
8 0.003760 0.000423 0.002659 0.000333 10 linear {'C': 10, 'kernel': 'linear'} 0.947368 0.972973 0.918919 0.946420 0.022078 7
9 0.004453 0.000755 0.002508 0.000103 10 poly {'C': 10, 'kernel': 'poly'} 0.921053 0.972973 0.918919 0.937648 0.024994 8
10 0.004332 0.000757 0.003176 0.000364 10 rbf {'C': 10, 'kernel': 'rbf'} 1.000000 0.972973 0.972973 0.981982 0.012741 1
11 0.005562 0.000661 0.003428 0.000818 10 sigmoid {'C': 10, 'kernel': 'sigmoid'} 0.210526 0.297297 0.243243 0.250356 0.035779 12
[97]:
url = 'https://bit.ly/3eoBK6t'
iris = pd.read_csv(url)

X = iris.iloc[:, :-1]
y = iris.species.astype('category').cat.codes

X_train, X_test, y_train, y_test = train_test_split(X, y)
[98]:
n, p = X_train.shape
[99]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
[100]:
import optuna

class Objective(object):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __call__(self, trial):
        X, y = self.X, self.y # load data once only

        criterion = trial.suggest_categorical('criterion', ['gini', 'entropy'])
        n_estimators = trial.suggest_int('n_estimators', 50, 201)
        max_features = trial.suggest_float('max_features', 1/p, 1)
        max_depth = trial.suggest_float('max_depth', 1, 128, log=True)
        min_samples_split = trial.suggest_int('min_samples_split', 2, 11, 1)
        min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 11, 1)

        clf = RandomForestClassifier(
            criterion = criterion,
            n_estimators = n_estimators,
            max_features = max_features,
            max_depth = max_depth,
            min_samples_split = min_samples_split,
            min_samples_leaf = min_samples_leaf,
        )

        score = cross_val_score(clf, X, y,  cv=5).mean()
        return score

objective =  Objective(X_train, y_train)
study = optuna.create_study(direction='maximize', storage='sqlite:///memory')

n_jobs = 4
with parallel_backend('spark', n_jobs=n_jobs):
    study.optimize(objective, n_trials=100, n_jobs=n_jobs)
[I 2020-11-04 12:47:30,511] A new study created in RDB with name: no-name-97aab8f6-8b87-4d03-afe2-5f85415b9671
[101]:
study.trials_dataframe().head(10)
[101]:
number value datetime_start datetime_complete duration params_criterion params_max_depth params_max_features params_min_samples_leaf params_min_samples_split params_n_estimators state
0 0 0.804348 2020-11-04 12:47:31.659174 2020-11-04 12:47:32.204716 0 days 00:00:00.545542 entropy 1.966860 0.457199 9 8 74 COMPLETE
1 1 0.964032 2020-11-04 12:47:31.659435 2020-11-04 12:47:32.745937 0 days 00:00:01.086502 gini 1.792140 0.997070 1 10 147 COMPLETE
2 2 0.955336 2020-11-04 12:47:31.659935 2020-11-04 12:47:32.810389 0 days 00:00:01.150454 entropy 118.759561 0.262684 1 8 146 COMPLETE
3 3 0.964427 2020-11-04 12:47:31.660439 2020-11-04 12:47:32.619121 0 days 00:00:00.958682 gini 4.435854 0.761082 7 4 128 COMPLETE
4 4 0.964427 2020-11-04 12:47:33.254434 2020-11-04 12:47:34.555648 0 days 00:00:01.301214 entropy 15.285156 0.795445 6 10 180 COMPLETE
5 5 0.937154 2020-11-04 12:47:33.691416 2020-11-04 12:47:34.149519 0 days 00:00:00.458103 gini 9.179145 0.301613 10 3 60 COMPLETE
6 6 0.964427 2020-11-04 12:47:33.780986 2020-11-04 12:47:34.522994 0 days 00:00:00.742008 gini 21.264760 0.675328 5 8 102 COMPLETE
7 7 0.964427 2020-11-04 12:47:33.831960 2020-11-04 12:47:34.710602 0 days 00:00:00.878642 gini 4.772650 0.518421 2 9 119 COMPLETE
8 8 0.955336 2020-11-04 12:47:35.174596 2020-11-04 12:47:36.389685 0 days 00:00:01.215089 entropy 30.773008 0.600859 8 5 175 COMPLETE
9 9 0.855336 2020-11-04 12:47:35.531175 2020-11-04 12:47:36.231259 0 days 00:00:00.700084 entropy 1.937602 0.955016 6 10 99 COMPLETE

Using spark with a non-MLLib classifier

While there is no easy way to train a general ML model in parallel, we can use Spark for distributed predictions once the model is trained,

[102]:
url = 'https://bit.ly/3eoBK6t'
iris = pd.read_csv(url)

X = iris.iloc[:, :-1]
y = iris.species.astype('category').cat.codes

X_train, X_test, y_train, y_test = train_test_split(X, y)

We use the best parameters found by Optuna in the distributed hyperparameter optimization above.

[103]:
study.best_params
[103]:
{'criterion': 'gini',
 'max_depth': 4.435854141101209,
 'max_features': 0.7610816811817611,
 'min_samples_leaf': 7,
 'min_samples_split': 4,
 'n_estimators': 128}
[104]:
rf = RandomForestClassifier(**study.best_params, random_state=0)
[105]:
rf.fit(X_train, y_train)
[105]:
RandomForestClassifier(max_depth=4.435854141101209,
                       max_features=0.7610816811817611, min_samples_leaf=7,
                       min_samples_split=4, n_estimators=128, random_state=0)
[106]:
from joblib import dump, load

dump(rf, 'rf.joblib')
[106]:
['rf.joblib']
[107]:
def predict(iterartor):
    model = load('rf.joblib') # model is only loaded once
    for features in iterartor:
        yield pd.DataFrame(model.predict(features))
[108]:
df = spark.createDataFrame(X_test)
[109]:
help(df.mapInPandas)
Help on method mapInPandas in module pyspark.sql.pandas.map_ops:

mapInPandas(func, schema) method of pyspark.sql.dataframe.DataFrame instance
    Maps an iterator of batches in the current :class:`DataFrame` using a Python native
    function that takes and outputs a pandas DataFrame, and returns the result as a
    :class:`DataFrame`.

    The function should take an iterator of `pandas.DataFrame`\s and return
    another iterator of `pandas.DataFrame`\s. All columns are passed
    together as an iterator of `pandas.DataFrame`\s to the function and the
    returned iterator of `pandas.DataFrame`\s are combined as a :class:`DataFrame`.
    Each `pandas.DataFrame` size can be controlled by
    `spark.sql.execution.arrow.maxRecordsPerBatch`.

    :param func: a Python native function that takes an iterator of `pandas.DataFrame`\s, and
        outputs an iterator of `pandas.DataFrame`\s.
    :param schema: the return type of the `func` in PySpark. The value can be either a
        :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string.

    >>> from pyspark.sql.functions import pandas_udf
    >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
    >>> def filter_func(iterator):
    ...     for pdf in iterator:
    ...         yield pdf[pdf.id == 1]
    >>> df.mapInPandas(filter_func, df.schema).show()  # doctest: +SKIP
    +---+---+
    | id|age|
    +---+---+
    |  1| 21|
    +---+---+

    .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`

    .. note:: Experimental

    .. versionadded:: 3.0

[110]:
pred = df.mapInPandas(predict, 'pred double')
[111]:
rf.predict(X_test)[:5]
[111]:
array([2, 1, 1, 1, 1], dtype=int8)
[112]:
pred.show(5)
+----+
|pred|
+----+
| 2.0|
| 1.0|
| 1.0|
| 1.0|
| 1.0|
+----+
only showing top 5 rows

[ ]: