Spark Low Level API¶
Outline¶
Foundations
Functional programming (Spark RDDs)
Use of pandas (Spark DataFramej
Use of SQL (Spark SQL)
Catalyst optimizer
Analysis
DataFrame
SQL AST
Logical plan
Catalog resolution of names
Rule-based optimization
Boolean simplification
Predicate pushdown
Constant folding
Projection pruning
Physical plan
Convert to RDD
Cost-based optimization
Code generation
Project Tungsten
Performs role of compiler
Generates Java bytecode
Resilient distributed datasets (RDD) - In-memory distributed collections - Removes need to write to disk for fault tolerance - Resilience - Reconstruct from lineage - Immutable - Distributed - RDD data live in one or more partitions - Dataset - Consists of records - Each partition consists of distinct set of records that can be operated on independently - Shared nothing philosophy
Loading data into RDDs
Programmatically
range()
parallelize()
From file
Compression
Splittable and non-splittable formats
Non-splittable files cannot be distributed
Splittable formats - LZO, Snappy
Non-splittable formats - gzip, zip
Data locality
Worker partitions from nearby DFS partitions
Default partition size is 128 MB
Local file system
Networked filesystem
Distributed filesystem
textFile()
WoleTextFiles()
From data resource
From stream
Persistence
persist)
cache()
Types of RDDs
Base RDD
Pair RDD
Double RDD
Many others
Base RDD
Narrow transformations
map()
filter()
flatMap(j
distinct()
Broad transformations
reduce()
groupby()
sortBy()
join()
Actions
count()
take()
takeOrdered()
top()
collect()
saveAsTextFile()
first()
reduce()
fold()
aggregate()
foreach()
PairedRDD
Dictionary functions
keys()
values()
keyBy()
Functional transformations
mapValues()
flatMapValues()
Grouping, sorting and aggregation
groupByKey()
reduceByKey()
foldByKey()
sortByKey()
Joins
Join large by small
join()
leftOuterJoin()
rightOuterJoin()
fullOuterJoin()
cogroup()
cartesian()
Set operations
union()
intersection(j
subtract()
subtractByKey()
Numeric RDD
min()
max()
sum()
mean()
stdev()
variance()
Architecture of a Spark Application¶
Big picture¶
You will type your commands iin a local Spark session, and the SparkContext will take care of running your instructions distributed across the workers (executors) on a cluster. Each executor can have 1 or more CPU cores, its own memory cahe, and is responsible for handling its own distributed tasks. Communicaiton between local and workers and between worker and worker is handled by a cluster manager.
Source: http://spark.apache.org/docs/latest/img/cluster-overview.png
Organizaiton of Spark tasks¶
Spark organizes tasks that can be performed without exchanging data across partitions into stages. The sequecne of tasks to be perfomed are laid out as a Directed Acyclic Graph (DAG). Tasks are differenitated into transforms (lazy evalutation - just add to DAG) and actions (eager evaluation - execute the specified path in the DAG). Note that calculations are not cached unless requested. Hence if you have triggered the action after RDD3 in the figure, then trigger the aciton after RDD6, RDD2 will be re-generated from RDD1 twice. We can avoid the re-calculation by persisting or cacheing RDD2.
SparkContext¶
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Here we set it up to use local nodes - the argument locals[*]
means to use the local machine as the cluster, using as many worker threads as there are cores. You can also explicitly set the number of cores with locals[k]
where k
is an integer.
With Saprk 2.0 onwards, there is also a SparkSession that manages DataFrames, which is the preferred abstraction for working in Spark. However DataFrames are composed of RDDs, and it is still necesaary to understand how to use and mainpulate RDDs for low level operations.
Depending on your setup, you many have to import SparkContext. This is not necessary in our Docker containers as we will be using livy
.
Start spark
[1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
[2]:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master("local")
.appName("BIOS-823")
.config("spark.executor.cores", 4)
.getOrCreate()
)
Use this instead if using Spark on vm-manage containers.
%%spark
[3]:
import numpy as np
Version
[4]:
sc.version
[4]:
'3.0.1'
Number of workers
[5]:
sc.defaultParallelism
[5]:
12
Data in an RDD is distributed across partitions. It is most efficient if data does not have to be transferred across partitions. We can see the default minimumn number of partitions, and the actual number in an RDD later.
[6]:
sc.defaultMinPartitions
[6]:
2
Resilient Distributed Datasets (RDD)¶
Creating an RDD¶
The RDD (Resilient Distributed Dataset) is a data storage abstraction - you can work with it as though it were single unit, while it may actually be distributed over many nodes in the computing cluster.
A first example¶
Distribute the data set to the workers
[7]:
xs = sc.parallelize(range(10))
xs
[7]:
PythonRDD[1] at RDD at PythonRDD.scala:53
[8]:
xs.getNumPartitions()
[8]:
12
Return all data within each partition as a list. Note that the glom() operation operates on the distributed workers without centralizing the data.
[8]:
xs.glom().collect()
[8]:
[[], [0], [1], [2], [3], [4], [], [5], [6], [7], [8], [9]]
Only keep even numbers
[9]:
xs = xs.filter(lambda x: x % 2 == 0)
xs
[9]:
PythonRDD[3] at RDD at PythonRDD.scala:53
Square all elements
[10]:
xs = xs.map(lambda x: x**2)
xs
[10]:
PythonRDD[4] at RDD at PythonRDD.scala:53
Execute the code and return the final dataset
[11]:
xs.collect()
[11]:
[0, 4, 16, 36, 64]
Reduce also triggers a calculation
[12]:
xs.reduce(lambda x, y: x+y)
[12]:
120
A common Spark idiom chains mutiple functions together¶
[13]:
(
sc.parallelize(range(10))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x**2)
.collect()
)
[13]:
[0, 4, 16, 36, 64]
Actions and transforms¶
A transform maps an RDD to another RDD - it is a lazy operation. To actually perform any work, we need to apply an action.
Actions¶
[14]:
x = sc.parallelize(np.random.randint(1, 6, 10))
[15]:
x.collect()
[15]:
[3, 1, 1, 2, 2, 5, 3, 2, 5, 2]
[16]:
x.take(5)
[16]:
[3, 1, 1, 2, 2]
[17]:
x.first()
[17]:
3
[18]:
x.top(5)
[18]:
[5, 5, 3, 3, 2]
[19]:
x.takeSample(True, 15)
[19]:
[1, 3, 1, 2, 3, 2, 2, 2, 1, 2, 3, 2, 3, 5, 5]
[20]:
x.count()
[20]:
10
[21]:
x.distinct().collect()
[21]:
[1, 2, 3, 5]
[22]:
x.countByValue()
[22]:
defaultdict(int, {3: 2, 1: 2, 2: 4, 5: 2})
[23]:
x.sum()
[23]:
26
[24]:
x.max()
[24]:
5
[25]:
x.mean()
[25]:
2.6
[26]:
x.stats()
[26]:
(count: 10, mean: 2.6, stdev: 1.3564659966250538, max: 5.0, min: 1.0)
Reduce, fold and aggregate actions¶
From the API:
reduce(f)
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
fold(zeroValue, op)
Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.”
The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.
aggregate(zeroValue, seqOp, combOp)
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”
The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U
Notes:
All 3 operations take a binary op with signature op(accumulator, operand)
[27]:
x = sc.parallelize(np.random.randint(1, 10, 12))
[28]:
x.collect()
[28]:
[2, 3, 2, 3, 3, 2, 9, 4, 5, 6, 4, 4]
max using reduce
[29]:
x.reduce(lambda x, y: x if x > y else y)
[29]:
9
sum using reduce
[30]:
x.reduce(lambda x, y: x+y)
[30]:
47
sum using fold
[31]:
x.fold(0, lambda x, y: x+y)
[31]:
47
prod using reduce
[32]:
x.reduce(lambda x, y: x*y)
[32]:
3732480
prod using fold
[33]:
x.fold(1, lambda x, y: x*y)
[33]:
3732480
sum using aggregate
[34]:
x.aggregate(0, lambda x, y: x + y, lambda x, y: x + y)
[34]:
47
count using aggregate
[35]:
x.aggregate(0, lambda acc, _: acc + 1, lambda x, y: x+y)
[35]:
12
mean using aggregate
[36]:
sum_count = x.aggregate([0,0],
lambda acc, x: (acc[0]+x, acc[1]+1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1]+ acc2[1]))
sum_count[0]/sum_count[1]
[36]:
3.9166666666666665
Warning: Be very careful with fold and aggregate - the zero
value must be “neutral”. The behavior can be different from Python’s reduce with an initial value.
This is because there are two levels of operations that use the zero
value - first locally, then globally.
[37]:
xs = x.collect()
[38]:
xs = np.array(xs)
[39]:
xs
[39]:
array([2, 3, 2, 3, 3, 2, 9, 4, 5, 6, 4, 4])
[40]:
sum(xs)
[40]:
47
Exercise: Explain the results shown below:
[41]:
from functools import reduce
[42]:
reduce(lambda x, y: x + y, xs, 1)
[42]:
48
[43]:
x.fold(1, lambda acc, val: acc + val)
[43]:
60
[44]:
x.aggregate(1, lambda x, y: x + y, lambda x, y: x + y)
[44]:
60
Exercise: Explain the results shown below:
[45]:
reduce(lambda x, y: x + y**2, xs, 0)
[45]:
229
[46]:
np.sum(xs**2)
[46]:
229
[47]:
x.fold(0, lambda x, y: x + y**2)
[47]:
9541
[48]:
x.aggregate(0, lambda x, y: x + y**2, lambda x, y: x + y)
[48]:
229
Exercise: Explain the results shown belwo:
[49]:
x.fold([], lambda acc, val: acc + [val])
[49]:
[[2], [3], [2], [3], [3], [2], [9], [4], [5], [6], [4], [4]]
[50]:
seqOp = lambda acc, val: acc + [val]
combOp = lambda acc, val: acc + val
x.aggregate([], seqOp, combOp)
[50]:
[2, 3, 2, 3, 3, 2, 9, 4, 5, 6, 4, 4]
Transforms¶
[51]:
x = sc.parallelize([1,2,3,4])
y = sc.parallelize([3,3,4,6])
[52]:
x.map(lambda x: x + 1).collect()
[52]:
[2, 3, 4, 5]
[53]:
x.filter(lambda x: x%3 == 0).collect()
[53]:
[3]
Think of flatMap as a map followed by a flatten operation¶
[54]:
x.flatMap(lambda x: range(x-2, x)).collect()
[54]:
[-1, 0, 0, 1, 1, 2, 2, 3]
[55]:
x.sample(False, 0.5).collect()
[55]:
[2, 3, 4]
Set-like transformss¶
[56]:
y.distinct().collect()
[56]:
[3, 4, 6]
[57]:
x.union(y).collect()
[57]:
[1, 2, 3, 4, 3, 3, 4, 6]
[58]:
x.intersection(y).collect()
[58]:
[3, 4]
[59]:
x.subtract(y).collect()
[59]:
[1, 2]
[60]:
x.cartesian(y).collect()
[60]:
[(1, 3),
(1, 3),
(1, 4),
(1, 6),
(2, 3),
(2, 3),
(2, 4),
(2, 6),
(3, 3),
(3, 3),
(3, 4),
(3, 6),
(4, 3),
(4, 3),
(4, 4),
(4, 6)]
Note that flatmap gets rid of empty lists, and is a good way to ignore “missing” or “malformed” entires.
[61]:
def conv(x):
try:
return [float(x)]
except:
return []
[62]:
s = "Thee square root of 3 is less than 3.14 unless you divide by 0".split()
x = sc.parallelize(s)
x.collect()
[62]:
['Thee',
'square',
'root',
'of',
'3',
'is',
'less',
'than',
'3.14',
'unless',
'you',
'divide',
'by',
'0']
[63]:
x.map(conv).collect()
[63]:
[[], [], [], [], [3.0], [], [], [], [3.14], [], [], [], [], [0.0]]
[64]:
x.flatMap(conv).collect()
[64]:
[3.0, 3.14, 0.0]
Working with key-value pairs¶
RDDs consissting of key-value pairs are required for many Spark operatinos. They can be created by using a function that returns an RDD composed of tuples.
[65]:
data = [('ann', 'spring', 'math', 98),
('ann', 'fall', 'bio', 50),
('bob', 'spring', 'stats', 100),
('bob', 'fall', 'stats', 92),
('bob', 'summer', 'stats', 100),
('charles', 'spring', 'stats', 88),
('charles', 'fall', 'bio', 100)
]
[66]:
rdd = sc.parallelize(data)
[67]:
rdd.keys().collect()
[67]:
['ann', 'ann', 'bob', 'bob', 'bob', 'charles', 'charles']
[68]:
rdd.collect()
[68]:
[('ann', 'spring', 'math', 98),
('ann', 'fall', 'bio', 50),
('bob', 'spring', 'stats', 100),
('bob', 'fall', 'stats', 92),
('bob', 'summer', 'stats', 100),
('charles', 'spring', 'stats', 88),
('charles', 'fall', 'bio', 100)]
Sum values by key
[69]:
(
rdd.
map(lambda x: (x[0], x[3])).
reduceByKey(lambda x, y: x + y).
collect()
)
[69]:
[('charles', 188), ('ann', 148), ('bob', 292)]
Running list of values by key
[70]:
(
rdd.
map(lambda x: ((x[0], x[3]))).
aggregateByKey([], lambda x, y: x + [y], lambda x, y: x + y).
collect()
)
[70]:
[('charles', [88, 100]), ('ann', [98, 50]), ('bob', [100, 92, 100])]
Average by key
[71]:
(
rdd.
map(lambda x: ((x[0], x[3]))).
aggregateByKey([], lambda x, y: x + [y], lambda x, y: x + y).
map(lambda x: (x[0], sum(x[1])/len(x[1]))).
collect()
)
[71]:
[('charles', 94.0), ('ann', 74.0), ('bob', 97.33333333333333)]
Using a different key
[72]:
(
rdd.
map(lambda x: ((x[2], x[3]))).
aggregateByKey([], lambda x, y: x + [y], lambda x, y: x + y).
map(lambda x: (x[0], sum(x[1])/len(x[1]))).
collect()
)
[72]:
[('bio', 75.0), ('math', 98.0), ('stats', 95.0)]
Using key-value pairs to find most frequent words in Ulysses¶
Note: This part assumes that there we are using HDFS.
If you want to install locally, there are tutorials on the web. For Macbook I followed this guide.
[73]:
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('./data/texts')
for f in fs.get(conf).listStatus(path):
print(f.getPath())
hdfs://localhost:9000/user/cliburnchan/data/texts/Portrait.txt
hdfs://localhost:9000/user/cliburnchan/data/texts/Ulysses.txt
hdfs://localhost:9000/user/cliburnchan/data/texts/looking_glass.txt
[74]:
ulysses = sc.textFile('./data/texts/Portrait.txt')
Note that we can also read in entire docs as the values.
[75]:
docs = sc.wholeTextFiles('./data/texts/')
docs.keys().collect()
[75]:
['hdfs://localhost:9000/user/cliburnchan/data/texts/Portrait.txt',
'hdfs://localhost:9000/user/cliburnchan/data/texts/Ulysses.txt',
'hdfs://localhost:9000/user/cliburnchan/data/texts/looking_glass.txt']
[76]:
ulysses.take(10)
[76]:
["Project Gutenberg's A Portrait of the Artist as a Young Man, by James Joyce",
'',
'This eBook is for the use of anyone anywhere at no cost and with',
'almost no restrictions whatsoever. You may copy it, give it away or',
're-use it under the terms of the Project Gutenberg License included',
'with this eBook or online at www.gutenberg.net',
'',
'',
'Title: A Portrait of the Artist as a Young Man',
'']
[77]:
import string
def tokenize(line):
table = dict.fromkeys(map(ord, string.punctuation))
return line.translate(table).lower().split()
[78]:
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
[78]:
['project',
'gutenbergs',
'a',
'portrait',
'of',
'the',
'artist',
'as',
'a',
'young']
[79]:
words = words.map(lambda x: (x, 1))
words.take(10)
[79]:
[('project', 1),
('gutenbergs', 1),
('a', 1),
('portrait', 1),
('of', 1),
('the', 1),
('artist', 1),
('as', 1),
('a', 1),
('young', 1)]
[80]:
counts = words.reduceByKey(lambda x, y: x+y)
counts.take(10)
[80]:
[('project', 87),
('portrait', 6),
('of', 3268),
('artist', 18),
('as', 484),
('young', 53),
('james', 5),
('this', 216),
('ebook', 10),
('is', 470)]
[81]:
counts.takeOrdered(10, key=lambda x: -x[1])
[81]:
[('the', 6082),
('and', 3439),
('of', 3268),
('a', 2009),
('to', 2008),
('he', 1854),
('his', 1743),
('in', 1592),
('was', 1067),
('that', 961)]
Word count chained version¶
[82]:
(
ulysses.flatMap(lambda line: tokenize(line))
.map(lambda word: (word, 1))
.reduceByKey(lambda x, y: x + y)
.takeOrdered(10, key=lambda x: -x[1])
)
[82]:
[('the', 6082),
('and', 3439),
('of', 3268),
('a', 2009),
('to', 2008),
('he', 1854),
('his', 1743),
('in', 1592),
('was', 1067),
('that', 961)]
Avoiding slow Python UDF tokenize¶
We will see how to to this in the DataFrames notebook.
CountByValue Action¶
If you are sure that the results will fit into memory, you can get a dacitonary of counts more easily.
[83]:
wc = (
ulysses.
flatMap(lambda line: tokenize(line)).
countByValue()
)
[84]:
wc['the']
[84]:
6082
Persisting data¶
The top_word
program will repeat ALL the computations each time we take an action such as takeOrdered
. We need to persist
or cache
the results - they are similar except that persist
gives more control over how the data is retained.
[85]:
counts.is_cached
[85]:
False
[86]:
counts.persist()
[86]:
PythonRDD[117] at RDD at PythonRDD.scala:53
[87]:
counts.is_cached
[87]:
True
[88]:
counts.takeOrdered(5, lambda x: -x[1])
[88]:
[('the', 6082), ('and', 3439), ('of', 3268), ('a', 2009), ('to', 2008)]
[89]:
counts.take(5)
[89]:
[('project', 87), ('portrait', 6), ('of', 3268), ('artist', 18), ('as', 484)]
[90]:
counts.takeOrdered(5, lambda x: x[0])
[90]:
[('1', 7), ('10', 1), ('11', 1), ('13', 1), ('14', 1)]
[91]:
counts.keys().take(5)
[91]:
['project', 'portrait', 'of', 'artist', 'as']
[92]:
counts.values().take(5)
[92]:
[87, 6, 3268, 18, 484]
[93]:
count_dict = counts.collectAsMap()
count_dict['circle']
[93]:
1
[94]:
counts.unpersist()
[94]:
PythonRDD[117] at RDD at PythonRDD.scala:53
[95]:
counts.is_cached
[95]:
False
[96]:
counts.cache()
[96]:
PythonRDD[117] at RDD at PythonRDD.scala:53
[97]:
counts.is_cached
[97]:
True
Merging key, value datasets¶
We will build a second counts key: value RDD from another of Joyce’s works - Portrait of the Artist as a Young Man.
[98]:
portrait = sc.textFile('./data/texts/Portrait.txt')
[99]:
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x+y)
)
[100]:
counts1.persist()
[100]:
PythonRDD[129] at RDD at PythonRDD.scala:53
Combine counts for words found in both books¶
[101]:
joined = counts.join(counts1)
[102]:
joined.take(5)
[102]:
[('project', (87, 87)),
('portrait', (6, 6)),
('of', (3268, 3268)),
('artist', (18, 18)),
('as', (484, 484))]
sum counts over words¶
[103]:
s = joined.mapValues(lambda x: x[0] + x[1])
s.take(5)
[103]:
[('project', 174), ('portrait', 12), ('of', 6536), ('artist', 36), ('as', 968)]
average counts across books¶
[104]:
avg = joined.mapValues(lambda x: np.mean(x))
avg.take(5)
[104]:
[('project', 87.0),
('portrait', 6.0),
('of', 3268.0),
('artist', 18.0),
('as', 484.0)]
[ ]: