Using Spark¶
With massive data, we need to load, extract, transform and analyze the data on multiple computers to overcome I/O and processing bottlenecks. However, when working on multiple computers (possibly hundreds to thousands), there is a high risk of failure in one or more nodes. Distributed computing frameworks are designed to handle failures gracefully, allowing the developer to focus on algorithm development rather than system administration.
The first such widely used open source framework was the Hadoop MapReduce framework. This provided transparent fault tolerance, and popularized the functional programming approach to distributed computing. The Hadoop work-flow uses repeated invocations of the following instructions:
load dataset from disk to memory
map function to elements of dataset
reduce results of map to get new aggregate dataset
save new dataset to disk
Hadoop has two main limitations:
- the repeated saving and loading of data to disk can be slow, and makes interactive development very challenging
- restriction to only
map
andreduce
constructs results in increased code complexity, since every problem must be tailored to themap-reduce
format
Spark is a more recent framework for distributed computing that addresses the limitations of Hadoop by allowing the use of in-memory datasets for iterative computation, and providing a rich set of functional programming constructs to make the developer’s job easier. Spark also provides libraries for common big data tasks, such as the need to run SQL queries, perform machine learning and process large graphical structures.
Local installation of Spark¶
These notes are based on a local installation of Spark. Usage of Spark on the OIT docker cluster is slightly different.
Note: This is a local installation. That means that it is intended
for use with a local copy of Jupyter, and is independent of the
Docker container on the OIT website. You would get access to this by
opening your local copy of Jupyter (that you presumably installed using
the Anaconda distribution), either form the Anaconda launcher or by
opening a terminal and typing jupyter notebook
. Then you can open
this notebook and run it.
If you are not clear about what is being described, please don’t attempt this. Use the Docker notebook for pyspark that OIT will provide.
If you want to install locally, see instructions at http://spark.apache.org/downloads.html. It is simplest to use the provided binaries
cd ~
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
tar xzf spark-2.1.0-bin-hadoop2.7.tgz
mv spark-2.1.0-bin-hadoop2.7 spark
Next install py4j
pip install py4j
Finally, edit your .bashrc or .bash_profile to incldue
export SPARK_HOME=~/spark
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"
Start a spark session¶
from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')
SparkContext¶
A SparkContext represents the connection to a Spark cluster, and can be
used to create RDDs, accumulators and broadcast variables on that
cluster. Here we set it up to use local nodes - the argument
locals[*]
means to use the local machine as the cluster, using as
many worker threads as there are cores. You can also explicitly set the
number of cores with locals[k]
where k
is an integer.
sc.version
'1.6.1'
sc.pythonVer
'3.5'
sc.master
'local[*]'
sc.sparkUser()
'cliburn'
sc.appName
'pyspark-shell'
sc.defaultParallelism
4
Actions and transforms with parallelized collections¶
A transform maps an RDD to another RDD - it is a lazy operation that only changes the direct acyclic graph representation. To actually perform any work, we need to apply an action.
Simple lists¶
rdd = sc.parallelize(range(10))
rdd.getNumPartitions()
4
rdd.count()
10
rdd.sum()
45
rdd.min(), rdd.max(), rdd.stdev(), rdd.variance()
(0, 9, 2.8722813232690143, 8.25)
rdd.stats()
(count: 10, mean: 4.5, stdev: 2.87228132327, max: 9.0, min: 0.0)
Getting values¶
rdd.first()
0
rdd.top(3)
[9, 8, 7]
rdd.take(3)
[0, 1, 2]
rdd.sample(withReplacement=False, fraction=.5, seed=2).collect()
[3, 6, 7, 9]
res = rdd.sample(withReplacement=False, fraction=.5, seed=2)
res.collect()
[3, 6, 7, 9]
rdd.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Applying functions¶
xs = rdd.filter(lambda x: x % 2 == 0)
xs.collect()
[0, 2, 4, 6, 8]
xs = rdd.map(lambda x: x**2)
xs.collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
xs = rdd.flatMap(lambda x: (x, x*x))
xs.collect()
[0, 0, 1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81]
rdd.fold(0, lambda a, b: a + b)
45
rdd.reduce(lambda a, b: a + b)
45
Using aggregate
¶
Count
rdd.aggregate(0, lambda acc, _: acc + 1, lambda a, b: a+b)
10
Sum
rdd.aggregate(0, lambda a, b: a + b, lambda a, b: a + b)
45
Think of aggregate as first doing a transform then a reduce.
ss = sc.parallelize('the quick brown fox jumps over the lazy dog'.split())
ss.aggregate(0, lambda acc, s: acc + len(s), lambda a, b: a + b)
35
len(''.join('the quick brown fox jumps over the lazy dog'.split()))
35
Chaining¶
(rdd.
map(lambda x: x+1).
fold(1, lambda a, b: a*b))
3628800
(rdd.
map(lambda x: x+1).
reduce(lambda a, b: a*b))
3628800
(
sc.parallelize(range(10))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x**2)
.collect()
)
[0, 4, 16, 36, 64]
Key-value pairs¶
import string
rdd = sc.parallelize(zip(2*string.ascii_lowercase[:5], range(10)))
rdd.collect()
[('a', 0),
('b', 1),
('c', 2),
('d', 3),
('e', 4),
('a', 5),
('b', 6),
('c', 7),
('d', 8),
('e', 9)]
rdd.keys().collect()
['a', 'b', 'c', 'd', 'e', 'a', 'b', 'c', 'd', 'e']
rdd.values().collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Convert to dictionary¶
d = rdd.collectAsMap()
list(d.items())[:5]
[('c', 7), ('d', 8), ('e', 9), ('a', 5), ('b', 6)]
Functions for key-value pairs¶
ys = rdd.mapValues(lambda x: x**2)
ys.collect()
[('a', 0),
('b', 1),
('c', 4),
('d', 9),
('e', 16),
('a', 25),
('b', 36),
('c', 49),
('d', 64),
('e', 81)]
rdd.countByKey()
defaultdict(int, {'a': 2, 'b': 2, 'c': 2, 'd': 2, 'e': 2})
rdd.countByValue()
defaultdict(int,
{('a', 0): 1,
('a', 5): 1,
('b', 1): 1,
('b', 6): 1,
('c', 2): 1,
('c', 7): 1,
('d', 3): 1,
('d', 8): 1,
('e', 4): 1,
('e', 9): 1})
ys = rdd.reduceByKey(lambda x, y: x + y)
ys.collect()
[('c', 9), ('b', 7), ('a', 5), ('e', 13), ('d', 11)]
Working with text¶
ulysses = sc.textFile('data/Ulysses.txt')
ulysses.take(10)
['The Project Gutenberg EBook of Ulysses, by James Joyce',
'',
'This eBook is for the use of anyone anywhere at no cost and with',
'almost no restrictions whatsoever. You may copy it, give it away or',
're-use it under the terms of the Project Gutenberg License included',
'with this eBook or online at www.gutenberg.org',
'',
'',
'Title: Ulysses',
'']
def tokenize(line):
table = dict.fromkeys(map(ord, string.punctuation))
return line.translate(table).lower().split()
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
['the',
'project',
'gutenberg',
'ebook',
'of',
'ulysses',
'by',
'james',
'joyce',
'this']
Word count if you want all words¶
counts = words.map(lambda x: (x, 1)).countByKey()
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
Word count without returning all words to memory¶
words1 = words.map(lambda x: (x, 1))
words1.take(5)
[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1)]
counts = words1.reduceByKey(lambda a, b: a + b)
counts.take(5)
[('kyries', 1),
('mobile', 2),
('gasteropod', 1),
('circle', 20),
('calamitous', 1)]
counts.takeOrdered(10, key=lambda x: -x[1])
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
Using a chain¶
(
sc.textFile('data/Ulysses.txt').
flatMap(lambda line: tokenize(line)).
map(lambda x: (x, 1)).
reduceByKey(lambda a, b: a + b).
takeOrdered(10, key=lambda x: -x[1])
)
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
Persisting data¶
The top_word
program will repeat ALL the computations each time we
take an action such as takeOrdered
. We need to persist
or
cache
the results - they are similar except that persist
gives
more control over how the data is retained.
counts.is_cached
False
counts.persist()
PythonRDD[62] at RDD at PythonRDD.scala:43
counts.is_cached
True
counts.takeOrdered(5, lambda x: -x[1])
[('the', 15107), ('of', 8257), ('and', 7282), ('a', 6553), ('to', 5042)]
counts.takeOrdered(5, lambda x: x[1])
[('kyries', 1),
('gasteropod', 1),
('calamitous', 1),
('kneecap', 1),
('riotously', 1)]
count_dict = counts.collectAsMap()
count_dict['circle']
20
Merging key, value datasets¶
We will build a second counts key: value RDD from another of Joyce’s works - Portrait of the Artist as a Young Man.
portrait = sc.textFile('data/Portrait.txt')
portrait.take(10)
["Project Gutenberg's A Portrait of the Artist as a Young Man, by James Joyce",
'',
'This eBook is for the use of anyone anywhere at no cost and with',
'almost no restrictions whatsoever. You may copy it, give it away or',
're-use it under the terms of the Project Gutenberg License included',
'with this eBook or online at www.gutenberg.net',
'',
'',
'Title: A Portrait of the Artist as a Young Man',
'']
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x+y)
)
counts1.persist()
PythonRDD[72] at RDD at PythonRDD.scala:43
counts1.take(5)
[('desisting', 1),
('inaction', 1),
('mobile', 1),
('ablative', 1),
('vastness', 1)]
joined = counts.join(counts1)
joined.take(5)
[('mobile', (2, 1)),
('circle', (20, 1)),
('temptations', (1, 4)),
('withering', (4, 1)),
('spoken', (16, 15))]
joined2 = joined.mapValues(lambda x: x[0] + x[1])
joined2.take(5)
[('mobile', 3),
('circle', 21),
('temptations', 5),
('withering', 5),
('spoken', 31)]
joined3 = joined.mapValues(lambda x: np.mean(x))
joined3.take(5)
[('mobile', 1.5),
('circle', 10.5),
('temptations', 2.5),
('withering', 2.5),
('spoken', 15.5)]