Using Spark Efficiently¶
Focus in this lecture is on Spark constructs that can make your programs more efficient. In general, this means minimizing the amount of data transfer across nodes, since this is usually the bottleneck for big data analysis problems.
- Shared variables
- Accumulators
- Broadcast variables
- DataFrames
- Partitioning and the Spark shuffle
Spark tuning and optimization is complicated - this tutorial only touches on some of the basic concepts.
Don’t forget the otehr areas of optimizaiton shown in previous notebooks:
- Use DataFrmaes rather than RDDs
- Use pyspark.sql.functions rather than a Python UDF
- If you use a UDF, see if you can use a vectorized UDF
In [1]:
%%spark
Starting Spark application
SparkSession available as 'spark'.
In [2]:
import numpy as np
import string
Resources¶
The Spark Shuffle and Partitioning¶
Some events trigger the redistribution of data across partitions, and
involves the (expensive) copying of data across executors and machines.
This is known as the shuffle. For example, if we do a
reduceByKey
operation on key-value pair RDD, Spark needs to collect
all pairs with the same key in the same partition to do the reduction.
For key-value RDDs, you have some control over the partitioning of the RDDs. In particular, you can ask Spark to partition a set of keys so that they are guaranteed to appear together on some node. This can minimize a lot of data transfer. For example, suppose you have a large key-value RDD consisting of user_name: comments from a web user community. Every night, you want to update with new user comments with a join operation
In [17]:
def fake_data(n, val):
users = list(map(''.join, np.random.choice(list(string.ascii_lowercase), (n,2))))
comments = [val]*n
return tuple(zip(users, comments))
In [18]:
data = fake_data(10000, 'a')
list(data)[:10]
[('en', 'a'), ('mg', 'a'), ('sn', 'a'), ('ys', 'a'), ('ov', 'a'), ('xp', 'a'), ('hs', 'a'), ('xx', 'a'), ('nb', 'a'), ('te', 'a')]
In [19]:
rdd = sc.parallelize(data).reduceByKey(lambda x, y: x+y)
In [20]:
new_data = fake_data(1000, 'b')
list(new_data)[:10]
[('bj', 'b'), ('jb', 'b'), ('ue', 'b'), ('oy', 'b'), ('pe', 'b'), ('zt', 'b'), ('jn', 'b'), ('mn', 'b'), ('ph', 'b'), ('zo', 'b')]
In [21]:
rdd_new = sc.parallelize(new_data).reduceByKey(lambda x, y: x+y).cache()
In [22]:
rdd_updated = rdd.join(rdd_new)
In [23]:
rdd_updated.take(10)
[('gs', ('aaaaaaaaaaaaa', 'bbbbb')), ('gg', ('aaaaaaaaaaaaaaa', 'bb')), ('yq', ('aaaaaaaa', 'bb')), ('gc', ('aaaaaaaaaaaaaaaaaa', 'b')), ('go', ('aaaaaaaaaaaaaaa', 'b')), ('gk', ('aaaaaaaaaaaaa', 'b')), ('lf', ('aaaaaaaaaaaaaaaa', 'bb')), ('iq', ('aaaaaaaaaaaaaaaaa', 'bbb')), ('ln', ('aaaaaaaaaaaaaaaaa', 'bb')), ('dr', ('aaaaaaaaaaaaa', 'b'))]
Using partitionBy
¶
The join
operation will hash all the keys of both rdd
and
rdd_nerw
, sending keys with the same hashes to the same node for the
actual join operation. There is a lot of unnecessary data transfer.
Since rdd
is a much larger data set than rdd_new
, we can instead
fix the partitioning of rdd
and just transfer the keys of
rdd_new
. This is done by rdd.partitionBy(numPartitions)
where
numPartitions
should be at least twice the number of cores.
From the R docs for partitionBy
This function operates on RDDs where every element is of the form list(K, V) or c(K, V). For each element of this RDD, the partitioner is used to compute a hash function and the RDD is partitioned using this hash value.
In other words, which parittion a data element is sent to depends on the key value.
In [24]:
rdd_A = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
for item in rdd_A.partitionBy(4).glom().collect():
print(item)
[(4, 4), (4, 4)]
[(1, 1), (1, 1)]
[(2, 2), (2, 2)]
[(3, 3)]
In [25]:
rdd_B = sc.parallelize([(4,'a'), (1,'b'), (2, 'c'), (3, 'd'), (4,'e'), (1, 'f')])
In [26]:
for item in rdd_B.glom().collect():
print(item)
[(4, 'a')]
[(1, 'b'), (2, 'c')]
[(3, 'd')]
[(4, 'e'), (1, 'f')]
In [27]:
rdd_comb = rdd_A.join(rdd_B).glom()
Note: See how all the items from rdd_B
have been transferred to
the partitions created by rdd_A
, but the items from rdd_A
have
not moved. If rdd_A
is much larger than rdd_B
then this
minimizes the amount of data transfer.
In [28]:
for item in rdd_comb.collect():
print(item)
[]
[(1, (1, 'f')), (1, (1, 'b')), (1, (1, 'f')), (1, (1, 'b'))]
[(2, (2, 'c')), (2, (2, 'c'))]
[(3, (3, 'd'))]
[(4, (4, 'a')), (4, (4, 'e')), (4, (4, 'a')), (4, (4, 'e'))]
[]
[]
[]
Applyin to our word counts
In [29]:
rdd2 = sc.parallelize(data).reduceByKey(lambda x, y: x+y)
rdd2 = rdd2.partitionBy(10).cache()
In [30]:
rdd2_updated = rdd2.join(rdd_new)
In [31]:
rdd2_updated.take(10)
[('zn', ('aaaaaaaaa', 'bb')), ('cm', ('aaaaaaaaaaaaa', 'b')), ('vh', ('aaaaaaaaaaaaaaaaaaaa', 'b')), ('eg', ('aaaaaaaaaaaaaaa', 'bbb')), ('xf', ('aaaaaaaaaa', 'b')), ('gy', ('aaaaaa', 'bbbb')), ('aq', ('aaaaaaaaaaaaaaaaa', 'b')), ('ik', ('aaaaaaaaaaaaaaaaaaaaaaa', 'b')), ('vv', ('aaaaaaaaa', 'b')), ('eu', ('aaaaaaaaaa', 'b'))]
In [32]:
spark.stop()