Introduction to Spark¶
This lecture is an introduction to the Spark framework for distributed computing, the basic data and control flow abstractions, and getting comfortable with the functional programming style needed to writte a Spark application.
- What problem does Spark solve?
- SparkContext and the master configuration
- RDDs
- Actions
- Transforms
- Key-value RDDs
- Example - word count
- Persistence
- Merging key-value RDDs
Learning objectives¶
- Overview of Spark
- Working with Spark RDDs
- Actions and transforms
- Working with Spark DataFrames
- Using the
ml
andmllib
for machine learning
Not covered¶
- Spark GraphX (library for graph algorithms)
- Spark Streaming (library for streaming (microbatch) data)
Installation¶
You should use teh current version of Spark at
https://spark.apache.org/downloads.html. Choose the package
Pre-built for Hadoop2.6 and later
. The instructions below use the
version current as of 11 April 2016.
cd ~
wget http://mirrors.gigenet.com/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
tar xxf spark-1.6.1-bin-hadoop2.6.tgz
rm spark-1.6.1-bin-hadoop2.6.tgz
mv spark-1.6.1-bin-hadoop2.6 spark
Install the py4j
Python package needed for pyspark
pip install py4j
You need to define these environment variables before starting the notebook.
export SPARK_HOME=~/spark
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PACKAGES="com.databricks:spark-csv_2.11:1.4.0"
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"
In Unix/Mac, this can be done in .bashrc
or .bash_profile
.
For the adventurous, see Running Spark on an AWS EC2 cluster.
Overview of Spark¶
With massive data, we need to load, extract, transform and analyze the data on multiple computers to overcome I/O and processing bottlenecks. However, when working on multiple computers (possibly hundreds to thousands), there is a high risk of failure in one or more nodes. Distributed computing frameworks are designed to handle failures gracefully, allowing the developer to focus on algorithm development rather than system administration.
The first such widely used open source framework was the Hadoop MapReduce framework. This provided transparent fault tolerance, and popularized the functional programming approach to distributed computing. The Hadoop work-flow uses repeated invocations of the following instructions:
load dataset from disk to memory
map function to elements of dataset
reduce results of map to get new aggregate dataset
save new dataset to disk
Hadoop has two main limitations:
- the repeated saving and loading of data to disk can be slow, and makes interactive development very challenging
- restriction to only
map
andreduce
constructs results in increased code complexity, since every problem must be tailored to themap-reduce
format
Spark is a more recent framework for distributed computing that addresses the limitations of Hadoop by allowing the use of in-memory datasets for iterative computation, and providing a rich set of functional programming constructs to make the developer’s job easier. Spark also provides libraries for common big data tasks, such as the need to run SQL queries, perform machine learning and process large graphical structures.
Architecture of a Spark Application¶
![Spark components](http://spark.apache.org/docs/latest/img/cluster-overview.png)
Spark components
SparkContext¶
A SparkContext represents the connection to a Spark cluster, and can be
used to create RDDs, accumulators and broadcast variables on that
cluster. Here we set it up to use local nodes - the argument
locals[*]
means to use the local machine as the cluster, using as
many worker threads as there are cores. You can also explicitly set the
number of cores with locals[k]
where k
is an integer.
In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')
Number of workers
In [2]:
sc.defaultParallelism
Out[2]:
8
Data in an RDD is distributed across partitions. It is most efficient if data does not have to be transferred across partitions.
In [3]:
sc.defaultMinPartitions
Out[3]:
2
You can check the status of your Spark environment and jobs using the Spark UI.
The RDD (Resilient Distributed Dataset) is a data storage abstraction - you can work with it as though it were single unit, while it may actually be distributed over many nodes in the computing cluster.
A first example¶
Distribute the data set to the workers
In [4]:
xs = sc.parallelize(range(10))
xs
Out[4]:
PythonRDD[1] at RDD at PythonRDD.scala:43
In [7]:
xs.getNumPartitions()
Out[7]:
8
Return all data within each partition as a list.
In [5]:
xs.glom().collect()
Out[5]:
[[0], [1], [2], [3, 4], [5], [6], [7], [8, 9]]
Only keep even numbers
In [9]:
xs = xs.filter(lambda x: x % 2 == 0)
xs
Out[9]:
PythonRDD[3] at RDD at PythonRDD.scala:43
Square all elements
In [10]:
xs = xs.map(lambda x: x**2, xs)
xs
Out[10]:
PythonRDD[4] at RDD at PythonRDD.scala:43
Executee the code and return the final dataset
In [11]:
xs.collect()
Out[11]:
[0, 4, 16, 36, 64]
Reduce also triggers a calculation
In [15]:
xs.reduce(lambda x, y: x+y)
Out[15]:
120
A common Spark idiom chains mutiple functions together¶
In [16]:
(
sc.parallelize(range(10))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x**2)
.collect()
)
Out[16]:
[0, 4, 16, 36, 64]
Actions and transforms¶
A transform maps an RDD to another RDD - it is a lazy operation. To actually perform any work, we need to apply an action.
In [17]:
import numpy as np
In [18]:
x = sc.parallelize(np.random.randint(1, 6, 10))
In [19]:
x.collect()
Out[19]:
[4, 1, 4, 5, 2, 2, 5, 5, 1, 3]
In [20]:
x.take(5)
Out[20]:
[4, 1, 4, 5, 2]
In [21]:
x.first()
Out[21]:
4
In [22]:
x.top(5)
Out[22]:
[5, 5, 5, 4, 4]
In [23]:
x.takeSample(True, 15)
Out[23]:
[4, 3, 4, 1, 5, 2, 3, 1, 4, 4, 2, 1, 5, 4, 5]
In [24]:
x.count()
Out[24]:
10
In [25]:
x.countByValue()
Out[25]:
defaultdict(int, {1: 2, 2: 2, 3: 1, 4: 2, 5: 3})
In [26]:
x.sum()
Out[26]:
32
In [27]:
x.max()
Out[27]:
5
In [28]:
x.mean()
Out[28]:
3.2000000000000002
In [29]:
x.stats()
Out[29]:
(count: 10, mean: 3.2, stdev: 1.53622914957, max: 5.0, min: 1.0)
In [30]:
import os
import shutil
if os.path.exists('data//x'):
shutil.rmtree('data/x')
x.saveAsTextFile('data/x')
In [31]:
!ls data/x
_SUCCESS part-00001 part-00003 part-00005 part-00007
part-00000 part-00002 part-00004 part-00006
In [32]:
!cat data/x/*
4
1
4
5
2
2
5
5
1
3
Fold, redcue and aggregate actions¶
max using reduce
In [33]:
x.reduce(lambda x, y: x if x > y else y)
Out[33]:
5
sum using reduce
In [34]:
x.reduce(lambda x, y: x+y)
Out[34]:
32
sum using fold
In [35]:
x.fold(0, lambda x, y: x+y)
Out[35]:
32
prod using reduce
In [36]:
x.reduce(lambda x, y: x*y)
Out[36]:
24000
prod using fold
In [37]:
x.fold(1, lambda x, y: x*y)
Out[37]:
24000
sum using aggregate
In [38]:
x.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)
Out[38]:
32
count using aggregate
In [39]:
x.aggregate(0, lambda acc, _: acc + 1, lambda x, y: x+y)
Out[39]:
10
mean using aggregate
In [40]:
sum_count = x.aggregate([0,0],
lambda acc, x: (acc[0]+x, acc[1]+1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1]+ acc2[1]))
sum_count[0]/sum_count[1]
Out[40]:
3.2000000000000002
Transforms¶
In [41]:
x = sc.parallelize([1,2,3,4])
y = sc.parallelize([3,3,4,6])
In [42]:
x.map(lambda x: x + 1).collect()
Out[42]:
[2, 3, 4, 5]
In [43]:
x.filter(lambda x: x%3 == 0).collect()
Out[43]:
[3]
Think of flatMap as a map followed by a flatten operation¶
In [44]:
x.flatMap(lambda x: range(x-2, x)).collect()
Out[44]:
[-1, 0, 0, 1, 1, 2, 2, 3]
In [45]:
x.sample(False, 0.5).collect()
Out[45]:
[3]
Set-like transformss¶
In [46]:
y.distinct().collect()
Out[46]:
[3, 4, 6]
In [47]:
x.union(y).collect()
Out[47]:
[1, 2, 3, 4, 3, 3, 4, 6]
In [48]:
x.intersection(y).collect()
Out[48]:
[3, 4]
In [49]:
x.subtract(y).collect()
Out[49]:
[1, 2]
In [50]:
x.cartesian(y).collect()
Out[50]:
[(1, 3),
(1, 3),
(1, 4),
(1, 6),
(2, 3),
(2, 3),
(2, 4),
(2, 6),
(3, 3),
(3, 3),
(3, 4),
(3, 6),
(4, 3),
(4, 3),
(4, 4),
(4, 6)]
Working with key-value pairs¶
RDDs consissting of key-value pairs are required for many Spark operatinos. They can be created by using a function that returns an RDD composed of tuples.
In [51]:
data = [('ann', 1), ('bob', 2)]
In [52]:
rdd = sc.parallelize(data)
In [53]:
rdd.keys().collect()
Out[53]:
['ann', 'bob']
In [54]:
ulysses = sc.textFile('data/Ulysses.txt')
In [55]:
ulysses.take(10)
Out[55]:
['The Project Gutenberg EBook of Ulysses, by James Joyce',
'',
'This eBook is for the use of anyone anywhere at no cost and with',
'almost no restrictions whatsoever. You may copy it, give it away or',
're-use it under the terms of the Project Gutenberg License included',
'with this eBook or online at www.gutenberg.org',
'',
'',
'Title: Ulysses',
'']
In [56]:
import string
def tokenize(line):
table = dict.fromkeys(map(ord, string.punctuation))
return line.translate(table).lower().split()
In [57]:
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
Out[57]:
['the',
'project',
'gutenberg',
'ebook',
'of',
'ulysses',
'by',
'james',
'joyce',
'this']
In [58]:
words = words.map(lambda x: (x, 1))
words.take(10)
Out[58]:
[('the', 1),
('project', 1),
('gutenberg', 1),
('ebook', 1),
('of', 1),
('ulysses', 1),
('by', 1),
('james', 1),
('joyce', 1),
('this', 1)]
In [59]:
counts = words.reduceByKey(lambda x, y: x+y)
counts.take(10)
Out[59]:
[('kyries', 1),
('mobile', 2),
('gasteropod', 1),
('circle', 20),
('calamitous', 1),
('kneecap', 1),
('divers', 6),
('riotously', 1),
('cookies', 1),
('temptations', 1)]
In [60]:
counts.takeOrdered(10, key=lambda x: -x[1])
Out[60]:
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
In [61]:
(
ulysses.flatMap(lambda line: tokenize(line))
.map(lambda word: (word, 1))
.reduceByKey(lambda x, y: x + y)
.takeOrdered(10, key=lambda x: -x[1])
)
Out[61]:
[('the', 15107),
('of', 8257),
('and', 7282),
('a', 6553),
('to', 5042),
('in', 4981),
('he', 4033),
('his', 3333),
('i', 2698),
('that', 2621)]
Persisting data¶
The top_word
program will repeat ALL the computations each time we
take an action such as takeOrdered
. We need to persist
or
cahce
the results - they are similar except that persist
gives
more control over how the data is retained.
In [62]:
counts.is_cached
Out[62]:
False
In [63]:
counts.persist()
Out[63]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [64]:
counts.is_cached
Out[64]:
True
In [65]:
counts.takeOrdered(5, lambda x: -x[1])
Out[65]:
[('the', 15107), ('of', 8257), ('and', 7282), ('a', 6553), ('to', 5042)]
In [66]:
counts.take(5)
Out[66]:
[('kyries', 1),
('mobile', 2),
('gasteropod', 1),
('circle', 20),
('calamitous', 1)]
In [67]:
counts.takeOrdered(5, lambda x: x[0])
Out[67]:
[('0', 2), ('001', 5), ('002', 1), ('003', 2), ('004', 3)]
In [68]:
counts.keys().take(5)
Out[68]:
['kyries', 'mobile', 'gasteropod', 'circle', 'calamitous']
In [69]:
counts.values().take(5)
Out[69]:
[1, 2, 1, 20, 1]
In [70]:
count_dict = counts.collectAsMap()
count_dict['circle']
Out[70]:
20
Using cacche instead of persist¶
In [71]:
counts.unpersist()
Out[71]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [72]:
counts.is_cached
Out[72]:
False
In [73]:
counts.cache()
Out[73]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [74]:
counts.is_cached
Out[74]:
True
Merging key, value datasets¶
We will build a second counts key: value RDD from another of Joyce’s works - Portrait of the Artist as a Young Man.
In [79]:
portrait = sc.textFile('data/Portrait.txt')
In [80]:
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x+y)
)
In [81]:
counts1.persist()
Out[81]:
PythonRDD[97] at RDD at PythonRDD.scala:43
Combine counts for words found in both books¶
In [82]:
joined = counts.join(counts1)
In [83]:
joined.take(5)
Out[83]:
[('mobile', (2, 1)),
('circle', (20, 1)),
('temptations', (1, 4)),
('withering', (4, 1)),
('spoken', (16, 15))]
sum counts over words¶
In [84]:
s = joined.mapValues(lambda x: x[0] + x[1])
s.take(5)
Out[84]:
[('mobile', 3),
('circle', 21),
('temptations', 5),
('withering', 5),
('spoken', 31)]
average counts across books¶
In [85]:
avg = joined.mapValues(lambda x: np.mean(x))
avg.take(5)
Out[85]:
[('mobile', 1.5),
('circle', 10.5),
('temptations', 2.5),
('withering', 2.5),
('spoken', 15.5)]
Version Information¶
In [86]:
%load_ext version_information
%version_information numpy, pyspark
Out[86]:
Software | Version |
---|---|
Python | 3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)] |
IPython | 4.0.3 |
OS | Darwin 15.4.0 x86_64 i386 64bit |
numpy | 1.10.4 |
pyspark | The 'pyspark' distribution was not found and is required by the application |
Mon Apr 18 13:49:17 2016 EDT |
In [ ]: