Introduction to Spark

This lecture is an introduction to the Spark framework for distributed computing, the basic data and control flow abstractions, and getting comfortable with the functional programming style needed to writte a Spark application.

  • What problem does Spark solve?
  • SparkContext and the master configuration
  • RDDs
  • Actions
  • Transforms
  • Key-value RDDs
  • Example - word count
  • Persistence
  • Merging key-value RDDs

Learning objectives

  • Overview of Spark
  • Working with Spark RDDs
  • Actions and transforms
  • Working with Spark DataFrames
  • Using the ml and mllib for machine learning

Not covered

  • Spark GraphX (library for graph algorithms)
  • Spark Streaming (library for streaming (microbatch) data)

Installation

You should use teh current version of Spark at https://spark.apache.org/downloads.html. Choose the package Pre-built for Hadoop2.6 and later. The instructions below use the version current as of 11 April 2016.

cd ~
wget http://mirrors.gigenet.com/apache/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
tar xxf spark-1.6.1-bin-hadoop2.6.tgz
rm spark-1.6.1-bin-hadoop2.6.tgz
mv spark-1.6.1-bin-hadoop2.6 spark

Install the py4j Python package needed for pyspark

pip install py4j

You need to define these environment variables before starting the notebook.

export SPARK_HOME=~/spark
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PACKAGES="com.databricks:spark-csv_2.11:1.4.0"
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

In Unix/Mac, this can be done in .bashrc or .bash_profile.

For the adventurous, see Running Spark on an AWS EC2 cluster.

Overview of Spark

With massive data, we need to load, extract, transform and analyze the data on multiple computers to overcome I/O and processing bottlenecks. However, when working on multiple computers (possibly hundreds to thousands), there is a high risk of failure in one or more nodes. Distributed computing frameworks are designed to handle failures gracefully, allowing the developer to focus on algorithm development rather than system administration.

The first such widely used open source framework was the Hadoop MapReduce framework. This provided transparent fault tolerance, and popularized the functional programming approach to distributed computing. The Hadoop work-flow uses repeated invocations of the following instructions:

load dataset from disk to memory
map function to elements of dataset
reduce results of map to get new aggregate dataset
save new dataset to disk

Hadoop has two main limitations:

  • the repeated saving and loading of data to disk can be slow, and makes interactive development very challenging
  • restriction to only map and reduce constructs results in increased code complexity, since every problem must be tailored to the map-reduce format

Spark is a more recent framework for distributed computing that addresses the limitations of Hadoop by allowing the use of in-memory datasets for iterative computation, and providing a rich set of functional programming constructs to make the developer’s job easier. Spark also provides libraries for common big data tasks, such as the need to run SQL queries, perform machine learning and process large graphical structures.

Languages supported

Fully supported

  • Java
  • Scala
  • Python

R support is less complete but available

Architecture of a Spark Application

Spark components

Spark components

SparkContext

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Here we set it up to use local nodes - the argument locals[*] means to use the local machine as the cluster, using as many worker threads as there are cores. You can also explicitly set the number of cores with locals[k] where k is an integer.

In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')

Number of workers

In [2]:
sc.defaultParallelism
Out[2]:
8

Data in an RDD is distributed across partitions. It is most efficient if data does not have to be transferred across partitions.

In [3]:
sc.defaultMinPartitions
Out[3]:
2

You can check the status of your Spark environment and jobs using the Spark UI.

The RDD (Resilient Distributed Dataset) is a data storage abstraction - you can work with it as though it were single unit, while it may actually be distributed over many nodes in the computing cluster.

A first example

Distribute the data set to the workers

In [4]:
xs = sc.parallelize(range(10))
xs
Out[4]:
PythonRDD[1] at RDD at PythonRDD.scala:43
In [7]:
xs.getNumPartitions()
Out[7]:
8

Return all data within each partition as a list.

In [5]:
xs.glom().collect()
Out[5]:
[[0], [1], [2], [3, 4], [5], [6], [7], [8, 9]]

Only keep even numbers

In [9]:
xs = xs.filter(lambda x: x % 2 == 0)
xs
Out[9]:
PythonRDD[3] at RDD at PythonRDD.scala:43

Square all elements

In [10]:
xs = xs.map(lambda x: x**2, xs)
xs
Out[10]:
PythonRDD[4] at RDD at PythonRDD.scala:43

Executee the code and return the final dataset

In [11]:
xs.collect()
Out[11]:
[0, 4, 16, 36, 64]

Reduce also triggers a calculation

In [15]:
xs.reduce(lambda x, y: x+y)
Out[15]:
120

A common Spark idiom chains mutiple functions together

In [16]:
(
    sc.parallelize(range(10))
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x**2)
    .collect()
)
Out[16]:
[0, 4, 16, 36, 64]

Actions and transforms

A transform maps an RDD to another RDD - it is a lazy operation. To actually perform any work, we need to apply an action.

In [17]:
import numpy as np
In [18]:
x = sc.parallelize(np.random.randint(1, 6, 10))
In [19]:
x.collect()
Out[19]:
[4, 1, 4, 5, 2, 2, 5, 5, 1, 3]
In [20]:
x.take(5)
Out[20]:
[4, 1, 4, 5, 2]
In [21]:
x.first()
Out[21]:
4
In [22]:
x.top(5)
Out[22]:
[5, 5, 5, 4, 4]
In [23]:
x.takeSample(True, 15)
Out[23]:
[4, 3, 4, 1, 5, 2, 3, 1, 4, 4, 2, 1, 5, 4, 5]
In [24]:
x.count()
Out[24]:
10
In [25]:
x.countByValue()
Out[25]:
defaultdict(int, {1: 2, 2: 2, 3: 1, 4: 2, 5: 3})
In [26]:
x.sum()
Out[26]:
32
In [27]:
x.max()
Out[27]:
5
In [28]:
x.mean()
Out[28]:
3.2000000000000002
In [29]:
x.stats()
Out[29]:
(count: 10, mean: 3.2, stdev: 1.53622914957, max: 5.0, min: 1.0)
In [30]:
import os
import shutil
if os.path.exists('data//x'):
    shutil.rmtree('data/x')
x.saveAsTextFile('data/x')
In [31]:
!ls data/x
_SUCCESS   part-00001 part-00003 part-00005 part-00007
part-00000 part-00002 part-00004 part-00006
In [32]:
!cat data/x/*
4
1
4
5
2
2
5
5
1
3

Fold, redcue and aggregate actions

max using reduce

In [33]:
x.reduce(lambda x, y: x if x > y else y)
Out[33]:
5

sum using reduce

In [34]:
x.reduce(lambda x, y: x+y)
Out[34]:
32

sum using fold

In [35]:
x.fold(0, lambda x, y: x+y)
Out[35]:
32

prod using reduce

In [36]:
x.reduce(lambda x, y: x*y)
Out[36]:
24000

prod using fold

In [37]:
x.fold(1, lambda x, y: x*y)
Out[37]:
24000

sum using aggregate

In [38]:
x.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)
Out[38]:
32

count using aggregate

In [39]:
x.aggregate(0, lambda acc, _: acc + 1, lambda x, y: x+y)
Out[39]:
10

mean using aggregate

In [40]:
sum_count = x.aggregate([0,0],
                        lambda acc, x: (acc[0]+x, acc[1]+1),
                        lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1]+ acc2[1]))
sum_count[0]/sum_count[1]
Out[40]:
3.2000000000000002

Transforms

In [41]:
x = sc.parallelize([1,2,3,4])
y = sc.parallelize([3,3,4,6])
In [42]:
x.map(lambda x: x + 1).collect()
Out[42]:
[2, 3, 4, 5]
In [43]:
x.filter(lambda x: x%3 == 0).collect()
Out[43]:
[3]

Think of flatMap as a map followed by a flatten operation

In [44]:
x.flatMap(lambda x: range(x-2, x)).collect()
Out[44]:
[-1, 0, 0, 1, 1, 2, 2, 3]
In [45]:
x.sample(False, 0.5).collect()
Out[45]:
[3]

Set-like transformss

In [46]:
y.distinct().collect()
Out[46]:
[3, 4, 6]
In [47]:
x.union(y).collect()
Out[47]:
[1, 2, 3, 4, 3, 3, 4, 6]
In [48]:
x.intersection(y).collect()
Out[48]:
[3, 4]
In [49]:
x.subtract(y).collect()
Out[49]:
[1, 2]
In [50]:
x.cartesian(y).collect()
Out[50]:
[(1, 3),
 (1, 3),
 (1, 4),
 (1, 6),
 (2, 3),
 (2, 3),
 (2, 4),
 (2, 6),
 (3, 3),
 (3, 3),
 (3, 4),
 (3, 6),
 (4, 3),
 (4, 3),
 (4, 4),
 (4, 6)]

Working with key-value pairs

RDDs consissting of key-value pairs are required for many Spark operatinos. They can be created by using a function that returns an RDD composed of tuples.

In [51]:
data = [('ann', 1), ('bob', 2)]
In [52]:
rdd = sc.parallelize(data)
In [53]:
rdd.keys().collect()
Out[53]:
['ann', 'bob']
In [54]:
ulysses = sc.textFile('data/Ulysses.txt')
In [55]:
ulysses.take(10)
Out[55]:
['The Project Gutenberg EBook of Ulysses, by James Joyce',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included',
 'with this eBook or online at www.gutenberg.org',
 '',
 '',
 'Title: Ulysses',
 '']
In [56]:
import string
def tokenize(line):
    table = dict.fromkeys(map(ord, string.punctuation))
    return line.translate(table).lower().split()
In [57]:
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
Out[57]:
['the',
 'project',
 'gutenberg',
 'ebook',
 'of',
 'ulysses',
 'by',
 'james',
 'joyce',
 'this']
In [58]:
words = words.map(lambda x: (x, 1))
words.take(10)
Out[58]:
[('the', 1),
 ('project', 1),
 ('gutenberg', 1),
 ('ebook', 1),
 ('of', 1),
 ('ulysses', 1),
 ('by', 1),
 ('james', 1),
 ('joyce', 1),
 ('this', 1)]
In [59]:
counts = words.reduceByKey(lambda x, y: x+y)
counts.take(10)
Out[59]:
[('kyries', 1),
 ('mobile', 2),
 ('gasteropod', 1),
 ('circle', 20),
 ('calamitous', 1),
 ('kneecap', 1),
 ('divers', 6),
 ('riotously', 1),
 ('cookies', 1),
 ('temptations', 1)]
In [60]:
counts.takeOrdered(10, key=lambda x: -x[1])
Out[60]:
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]
In [61]:
(
ulysses.flatMap(lambda line: tokenize(line))
                .map(lambda word: (word, 1))
               .reduceByKey(lambda x, y: x + y)
               .takeOrdered(10, key=lambda x: -x[1])
)
Out[61]:
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Persisting data

The top_word program will repeat ALL the computations each time we take an action such as takeOrdered. We need to persist or cahce the results - they are similar except that persist gives more control over how the data is retained.

In [62]:
counts.is_cached
Out[62]:
False
In [63]:
counts.persist()
Out[63]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [64]:
counts.is_cached
Out[64]:
True
In [65]:
counts.takeOrdered(5, lambda x: -x[1])
Out[65]:
[('the', 15107), ('of', 8257), ('and', 7282), ('a', 6553), ('to', 5042)]
In [66]:
counts.take(5)
Out[66]:
[('kyries', 1),
 ('mobile', 2),
 ('gasteropod', 1),
 ('circle', 20),
 ('calamitous', 1)]
In [67]:
counts.takeOrdered(5, lambda x: x[0])
Out[67]:
[('0', 2), ('001', 5), ('002', 1), ('003', 2), ('004', 3)]
In [68]:
counts.keys().take(5)
Out[68]:
['kyries', 'mobile', 'gasteropod', 'circle', 'calamitous']
In [69]:
counts.values().take(5)
Out[69]:
[1, 2, 1, 20, 1]
In [70]:
count_dict = counts.collectAsMap()
count_dict['circle']
Out[70]:
20

Using cacche instead of persist

In [71]:
counts.unpersist()
Out[71]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [72]:
counts.is_cached
Out[72]:
False
In [73]:
counts.cache()
Out[73]:
PythonRDD[81] at RDD at PythonRDD.scala:43
In [74]:
counts.is_cached
Out[74]:
True

Merging key, value datasets

We will build a second counts key: value RDD from another of Joyce’s works - Portrait of the Artist as a Young Man.

In [79]:
portrait = sc.textFile('data/Portrait.txt')
In [80]:
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
        .map(lambda x: (x, 1))
        .reduceByKey(lambda x,y: x+y)
)
In [81]:
counts1.persist()
Out[81]:
PythonRDD[97] at RDD at PythonRDD.scala:43

Combine counts for words found in both books

In [82]:
joined = counts.join(counts1)
In [83]:
joined.take(5)
Out[83]:
[('mobile', (2, 1)),
 ('circle', (20, 1)),
 ('temptations', (1, 4)),
 ('withering', (4, 1)),
 ('spoken', (16, 15))]

sum counts over words

In [84]:
s = joined.mapValues(lambda x: x[0] + x[1])
s.take(5)
Out[84]:
[('mobile', 3),
 ('circle', 21),
 ('temptations', 5),
 ('withering', 5),
 ('spoken', 31)]

average counts across books

In [85]:
avg = joined.mapValues(lambda x: np.mean(x))
avg.take(5)
Out[85]:
[('mobile', 1.5),
 ('circle', 10.5),
 ('temptations', 2.5),
 ('withering', 2.5),
 ('spoken', 15.5)]

Version Information

In [86]:
%load_ext version_information
%version_information numpy, pyspark
Out[86]:
SoftwareVersion
Python3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)]
IPython4.0.3
OSDarwin 15.4.0 x86_64 i386 64bit
numpy1.10.4
pysparkThe 'pyspark' distribution was not found and is required by the application
Mon Apr 18 13:49:17 2016 EDT
In [ ]: