import numpy as np

Spark on a local mahcine using 4 nodes

Started with

SPARK_WORKER_MEMORY=512m MASTER=local[4] IPYTHON_OPTS="notebook" pyspark

If you have a Spark cluster, just set

MASTER=spark://IP:PORT

Everything else works the same way.

Using Spark in standalone prograsm

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local[4]")
         .setAppName("STA663")
         .set("spark.executor.memory", "4g"))
sc = SparkContext(conf = conf)

Check that the SparkContext object is available.

sc
<pyspark.context.SparkContext at 0x10fecee10>

Introduction to Spark concepts with a data manipulation example

Adapted from scala version in Chapter 2: Introduction to Data Analysis with Scala and Spark of Advanced Analytics with Spark (O’Reilly 2015)

import os

if not os.path.exists('documentation'):
    ! curl -o documentation https://archive.ics.uci.edu/ml/machine-learning-databases/00210/documentation
if not os.path.exists('donation.zip'):
    ! curl -o donation.zip https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip
! unzip -n -q donation.zip
! unzip -n -q 'block_*.zip'
if not os.path.exists('linkage'):
    ! mkdir linkage
! mv block_*.csv linkage
! rm block_*.zip
10 archives were successfully processed.

Info about the data set

Please see the documentation file.

If we are running Spark on Hadoop, we need to transfer files to HDFS

! hadoop fs -mkdir linkage
! hadoop fs -put block_*.csv linkage
rdd = sc.textFile('linkage')

Actions trigger execution and return a non-RDD result

rdd.first()
u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'
rdd.take(10)
[u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE']
def is_header(line):
    return "id_1" in line

Transforms return an RDD and are lazy

vals = rdd.filter(lambda x: not is_header(x))
vals
PythonRDD[4] at RDD at PythonRDD.scala:42
vals.count()
5749132

Now it is evaluated

vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

Each time we access vals, it is reconstructed from the original sources

Spark maintains a DAG of how each RDD was constructed so that data sets can be reconstructed - hence resilient distributed datasets. However, this is inefficient.

# vals is reconstructed again
vals.first()
u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'

Spark allows us to persist RDDs that we will be re-using

vals.cache()
PythonRDD[4] at RDD at PythonRDD.scala:42
# now vals is no longer reconstructed but retrieved from memory
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']
vals.take(10)
[u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE',
 u'46626,47940,1,?,1,?,1,1,1,1,1,TRUE']

Parse lines and work on them

def parse(line):
    pieces = line.strip().split(',')
    id1, id2 = map(int, pieces[:2])
    scores = [np.nan if p=='?' else float(p) for p in pieces[2:11]]
    matched = True if pieces[11] == 'TRUE' else False
    return [id1, id2, scores, matched]
mds = vals.map(lambda x: parse(x))
mds.cache()
PythonRDD[10] at RDD at PythonRDD.scala:42
match_counts = mds.map(lambda x: x[-1]).countByValue()
for cls in match_counts:
    print cls, match_counts[cls]
False 5728201
True 20931

Summary statistics

mds.map(lambda x: x[2][0]).stats()
(count: 5749132, mean: nan, stdev: nan, max: nan, min: nan)
mds.filter(lambda x: np.isfinite(x[2][0])).map(lambda x: x[2][0]).stats()
(count: 5748125, mean: 0.712902470443, stdev: 0.3887583258, max: 1.0, min: 0.0)

Takes too long on laptop - skip

stats = [mds.filter(lambda x: np.isfinite(x[2][i])).map(lambda x: x[2][i]).stats() for i in range(3)]

for stat in stats: print stat

Using the MLlib for Regression

Adapted from example in Spark doucmentation.

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint

def parsePoint(md):
    return LabeledPoint(md[-1], md[2])

full_count = mds.count()

# Only use columns with less than 20% missing as features
idxs = [i for i in range(9) if
        mds.filter(lambda p: np.isfinite(p[2][i])).count() > 0.8*full_count]

data = mds.filter(lambda p: np.all(np.isfinite(np.array(p[2])[idxs]))).map(lambda p: parsePoint(p))
train_data, predict_data = data.randomSplit([0.9, 0.1])

model = LogisticRegressionWithSGD.train(train_data)

labelsAndPreds = predict_data.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(predict_data.count())

print "Training Error = " + str(trainErr)
[0, 2, 4, 5, 6, 7, 8]
5160175 574313
5734488 5160175 574313
Training Error = 0.00356774093569