Using Spark

With massive data, we need to load, extract, transform and analyze the data on multiple computers to overcome I/O and processing bottlenecks. However, when working on multiple computers (possibly hundreds to thousands), there is a high risk of failure in one or more nodes. Distributed computing frameworks are designed to handle failures gracefully, allowing the developer to focus on algorithm development rather than system administration.

The first such widely used open source framework was the Hadoop MapReduce framework. This provided transparent fault tolerance, and popularized the functional programming approach to distributed computing. The Hadoop work-flow uses repeated invocations of the following instructions:

load dataset from disk to memory
map function to elements of dataset
reduce results of map to get new aggregate dataset
save new dataset to disk

Hadoop has two main limitations:

  • the repeated saving and loading of data to disk can be slow, and makes interactive development very challenging
  • restriction to only map and reduce constructs results in increased code complexity, since every problem must be tailored to the map-reduce format

Spark is a more recent framework for distributed computing that addresses the limitations of Hadoop by allowing the use of in-memory datasets for iterative computation, and providing a rich set of functional programming constructs to make the developer’s job easier. Spark also provides libraries for common big data tasks, such as the need to run SQL queries, perform machine learning and process large graphical structures.

Cheat sheet

Spark cheat sheet

Spark and cluster computing

Spark components

Spark components

Local installation of Spark

These notes are based on a local installation of Spark. Usage of Spark on the OIT docker cluster is slightly different.

Note: This is a local installation. That means that it is intended for use with a local copy of Jupyter, and is independent of the Docker container on the OIT website. You would get access to this by opening your local copy of Jupyter (that you presumably installed using the Anaconda distribution), either form the Anaconda launcher or by opening a terminal and typing jupyter notebook. Then you can open this notebook and run it.

If you are not clear about what is being described, please don’t attempt this. Use the Docker notebook for pyspark that OIT will provide.

If you want to install locally, see instructions at http://spark.apache.org/downloads.html. It is simplest to use the provided binaries

cd ~
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
tar xzf spark-2.1.0-bin-hadoop2.7.tgz
mv spark-2.1.0-bin-hadoop2.7 spark

Next install py4j

pip install py4j

Finally, edit your .bashrc or .bash_profile to incldue

export SPARK_HOME=~/spark
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

Start a spark session

from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')

Checking Spark status

If you are running Spark locally, you can check the status of your Spark environment and jobs using the Spark UI.

SparkContext

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Here we set it up to use local nodes - the argument locals[*] means to use the local machine as the cluster, using as many worker threads as there are cores. You can also explicitly set the number of cores with locals[k] where k is an integer.

sc.version
'1.6.1'
sc.pythonVer
'3.5'
sc.master
'local[*]'
sc.sparkUser()
'cliburn'
sc.appName
'pyspark-shell'
sc.defaultParallelism
4

Actions and transforms with parallelized collections

A transform maps an RDD to another RDD - it is a lazy operation that only changes the direct acyclic graph representation. To actually perform any work, we need to apply an action.

Simple lists

rdd = sc.parallelize(range(10))
rdd.getNumPartitions()
4
rdd.count()
10
rdd.sum()
45
rdd.min(), rdd.max(), rdd.stdev(), rdd.variance()
(0, 9, 2.8722813232690143, 8.25)
rdd.stats()
(count: 10, mean: 4.5, stdev: 2.87228132327, max: 9.0, min: 0.0)

Getting values

rdd.first()
0
rdd.top(3)
[9, 8, 7]
rdd.take(3)
[0, 1, 2]
rdd.sample(withReplacement=False, fraction=.5, seed=2).collect()
[3, 6, 7, 9]
res = rdd.sample(withReplacement=False, fraction=.5, seed=2)
res.collect()
[3, 6, 7, 9]
rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

See how the data is partitioned

rdd.glom().collect()
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]

Applying functions

xs = rdd.filter(lambda x: x % 2 == 0)
xs.collect()
[0, 2, 4, 6, 8]
xs = rdd.map(lambda x: x**2)
xs.collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
xs = rdd.flatMap(lambda x: (x, x*x))
xs.collect()
[0, 0, 1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81]
rdd.fold(0, lambda a, b: a + b)
45
rdd.reduce(lambda a, b: a + b)
45

Using aggregate

Count

rdd.aggregate(0, lambda acc, _: acc + 1, lambda a, b: a+b)
10

Sum

rdd.aggregate(0, lambda a, b: a + b, lambda a, b: a + b)
45

Think of aggregate as first doing a transform then a reduce.

ss = sc.parallelize('the quick brown fox jumps over the lazy dog'.split())
ss.aggregate(0, lambda acc, s: acc + len(s), lambda a, b: a + b)
35
len(''.join('the quick brown fox jumps over the lazy dog'.split()))
35

Chaining

(rdd.
 map(lambda x: x+1).
 fold(1, lambda a, b: a*b))
3628800
(rdd.
map(lambda x: x+1).
reduce(lambda a, b: a*b))
3628800
(
    sc.parallelize(range(10))
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x**2)
    .collect()
)
[0, 4, 16, 36, 64]

Key-value pairs

import string
rdd = sc.parallelize(zip(2*string.ascii_lowercase[:5], range(10)))
rdd.collect()
[('a', 0),
 ('b', 1),
 ('c', 2),
 ('d', 3),
 ('e', 4),
 ('a', 5),
 ('b', 6),
 ('c', 7),
 ('d', 8),
 ('e', 9)]
rdd.keys().collect()
['a', 'b', 'c', 'd', 'e', 'a', 'b', 'c', 'd', 'e']
rdd.values().collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Convert to dictionary

d = rdd.collectAsMap()
list(d.items())[:5]
[('c', 7), ('d', 8), ('e', 9), ('a', 5), ('b', 6)]

Functions for key-value pairs

ys = rdd.mapValues(lambda x: x**2)
ys.collect()
[('a', 0),
 ('b', 1),
 ('c', 4),
 ('d', 9),
 ('e', 16),
 ('a', 25),
 ('b', 36),
 ('c', 49),
 ('d', 64),
 ('e', 81)]
rdd.countByKey()
defaultdict(int, {'a': 2, 'b': 2, 'c': 2, 'd': 2, 'e': 2})
rdd.countByValue()
defaultdict(int,
            {('a', 0): 1,
             ('a', 5): 1,
             ('b', 1): 1,
             ('b', 6): 1,
             ('c', 2): 1,
             ('c', 7): 1,
             ('d', 3): 1,
             ('d', 8): 1,
             ('e', 4): 1,
             ('e', 9): 1})
ys = rdd.reduceByKey(lambda x, y: x + y)
ys.collect()
[('c', 9), ('b', 7), ('a', 5), ('e', 13), ('d', 11)]

Working with text

ulysses = sc.textFile('data/Ulysses.txt')
ulysses.take(10)
['The Project Gutenberg EBook of Ulysses, by James Joyce',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: Ulysses',
 '']
def tokenize(line):
    table = dict.fromkeys(map(ord, string.punctuation))
    return line.translate(table).lower().split()
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'ulysses',
 'by',
 'james',
 'joyce',
 'this']

Word count if you want all words

counts = words.map(lambda x: (x, 1)).countByKey()
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Word count without returning all words to memory

words1 = words.map(lambda x: (x, 1))
words1.take(5)
[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1)]
counts = words1.reduceByKey(lambda a, b: a + b)
counts.take(5)
[('kyries', 1),
 ('mobile', 2),
 ('gasteropod', 1),
 ('circle', 20),
 ('calamitous', 1)]
counts.takeOrdered(10, key=lambda x: -x[1])
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Using a chain

(
sc.textFile('data/Ulysses.txt').
flatMap(lambda line: tokenize(line)).
map(lambda x: (x, 1)).
reduceByKey(lambda a, b: a + b).
takeOrdered(10, key=lambda x: -x[1])
)
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Persisting data

The top_word program will repeat ALL the computations each time we take an action such as takeOrdered. We need to persist or cache the results - they are similar except that persist gives more control over how the data is retained.

counts.is_cached
False
counts.persist()
PythonRDD[62] at RDD at PythonRDD.scala:43
counts.is_cached
True
counts.takeOrdered(5, lambda x: -x[1])
[('the', 15107), ('of', 8257), ('and', 7282), ('a', 6553), ('to', 5042)]
counts.takeOrdered(5, lambda x: x[1])
[('kyries', 1),
 ('gasteropod', 1),
 ('calamitous', 1),
 ('kneecap', 1),
 ('riotously', 1)]
count_dict = counts.collectAsMap()
count_dict['circle']
20

Merging key, value datasets

We will build a second counts key: value RDD from another of Joyce’s works - Portrait of the Artist as a Young Man.

portrait = sc.textFile('data/Portrait.txt')
portrait.take(10)
["Project Gutenberg's A Portrait of the Artist as a Young Man, by James Joyce",
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.net',
 '',
 '',
 'Title: A Portrait of the Artist as a Young Man',
 '']
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
        .map(lambda x: (x, 1))
        .reduceByKey(lambda x,y: x+y)
)
counts1.persist()
PythonRDD[72] at RDD at PythonRDD.scala:43
counts1.take(5)
[('desisting', 1),
 ('inaction', 1),
 ('mobile', 1),
 ('ablative', 1),
 ('vastness', 1)]
joined = counts.join(counts1)
joined.take(5)
[('mobile', (2, 1)),
 ('circle', (20, 1)),
 ('temptations', (1, 4)),
 ('withering', (4, 1)),
 ('spoken', (16, 15))]
joined2 = joined.mapValues(lambda x: x[0] + x[1])
joined2.take(5)
[('mobile', 3),
 ('circle', 21),
 ('temptations', 5),
 ('withering', 5),
 ('spoken', 31)]
joined3 = joined.mapValues(lambda x: np.mean(x))
joined3.take(5)
[('mobile', 1.5),
 ('circle', 10.5),
 ('temptations', 2.5),
 ('withering', 2.5),
 ('spoken', 15.5)]