{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Using Spark Efficiently\n", "====\n", "\n", "Focus in this lecture is on Spark constructs that can make your programs more efficient. In general, this means minimizing the amount of data transfer across nodes, since this is usually the bottleneck for big data analysis problems.\n", "\n", "- Shared variables\n", " - Accumulators\n", " - Broadcast variables\n", "- DataFrames\n", "- Partitioning and the Spark shuffle\n", "- Piping to external programs\n", "\n", "Spark tuning and optimization is complicated - this tutorial only touches on some of the basic concepts." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import numpy as np\n", "import string" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from pyspark import SparkContext\n", "sc = SparkContext('local[*]')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Resources\n", "----\n", "\n", "[The Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Accumulators\n", "----\n", "\n", "Spark functions such as `map` can use variables defined in the driver program, but they make local copies of the variable that are not passed back to the driver program. Accumulators are *shared variable* that allow the aggregation of results from workers back to the driver program, for example, as an event counter. Suppose we want to count the number of rows of data with missing information. The most efficient way is to use an **accumulator**." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "ulysses = sc.textFile('data/Ulysses.txt')" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['The Project Gutenberg EBook of Ulysses, by James Joyce',\n", " '',\n", " 'This eBook is for the use of anyone anywhere at no cost and with',\n", " 'almost no restrictions whatsoever. You may copy it, give it away or',\n", " 're-use it under the terms of the Project Gutenberg License included',\n", " 'with this eBook or online at www.gutenberg.org',\n", " '',\n", " '',\n", " 'Title: Ulysses',\n", " '']" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ulysses.take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Event counting\n", "\n", "Notice that we have some empty lines. We want to count the number of non-empty lines." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "num_lines = sc.accumulator(0)\n", "\n", "def tokenize(line):\n", " table = dict.fromkeys(map(ord, string.punctuation))\n", " return line.translate(table).lower().strip().split()\n", "\n", "def tokenize_count(line):\n", " global num_lines\n", " \n", " if line:\n", " num_lines += 1\n", "\n", " return tokenize(line)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [], "source": [ "counter = ulysses.flatMap(lambda line: tokenize_count(line)).countByValue()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "20" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "counter['circle']" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "25396" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "num_lines.value" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Broadcast Variables\n", "----\n", "\n", "Sometimes we need to send a large read only variable to all workers. For example, we might want to share a large feature matrix to all workers as a part of a machine learning application. This same variable will be sent separately for each parallel operation unless you use a **broadcast variable**. Also, the default variable passing mechanism is optimized for small variables and can be slow when the variable is large." ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from itertools import count\n", "\n", "table = dict(zip(string.ascii_letters, count()))" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def weight_first(line, table):\n", " words = tokenize(line)\n", " return sum(table.get(word[0], 0) for word in words if word.isalpha())\n", "\n", "def weight_last(line, table):\n", " words = tokenize(line)\n", " return sum(table.get(word[-1], 0) for word in words if word.isalpha())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The dictionary `table` is sent out twice to worker nodes, one for each call" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "2941855" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ulysses.map(lambda line: weight_first(line, table)).sum()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "2995994" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ulysses.map(lambda line: weight_last(line, table)).sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Converting to use broadast variables is simple and more efficient\n", "\n", "- Use SparkContext.broadcast() to create a broadcast variable\n", "- Where you would use var, use var.value\n", "- The broadcast variabel is sent once to each node and can be re-used" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": true }, "outputs": [], "source": [ "table_bc = sc.broadcast(table)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def weight_first_bc(line, table):\n", " words = tokenize(line)\n", " return sum(table.value.get(word[0], 0) for word in words if word.isalpha())\n", "\n", "def weight_last_bc(line, table):\n", " words = tokenize(line)\n", " return sum(table.value.get(word[-1], 0) for word in words if word.isalpha())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### table_bc is sent to nodes only once.\n", "\n", "Although it looks like table_bc is being passed to each function, all that is passed is a path to the table. The worker checks if the path has been cached and uses the cache instead of loading from the path." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "2941855" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ulysses.map(lambda line: weight_first_bc(line, table_bc)).sum()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "2995994" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ulysses.map(lambda line: weight_last_bc(line, table_bc)).sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The Spark Shuffle and Partitioning\n", "----\n", "\n", "Some events trigger the redistribution of data across partitions, and involves the (expensive) copying of data across executors and machines. This is known as the **shuffle**. For example, if we do a `reduceByKey` operation on key-value pair RDD, Spark needs to collect all pairs with the same key in the same partition to do the reduction. \n", "\n", "For key-value RDDs, you have some control over the partitioning of the RDDs. In particular, you can ask Spark to partition a set of keys so that they are guaranteed to appear together on some node. This can minimize a lot of data transfer. For example, suppose you have a large key-value RDD consisting of user_name: comments from a web user community. Every night, you want to update with new user comments with a join operation" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def fake_data(n, val):\n", " users = list(map(''.join, np.random.choice(list(string.ascii_lowercase), (n,2))))\n", " comments = [val]*n\n", " return tuple(zip(users, comments))" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[('uw', 'a'),\n", " ('iv', 'a'),\n", " ('cy', 'a'),\n", " ('to', 'a'),\n", " ('ea', 'a'),\n", " ('jc', 'a'),\n", " ('th', 'a'),\n", " ('pe', 'a'),\n", " ('rf', 'a'),\n", " ('ng', 'a')]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = fake_data(10000, 'a')\n", "list(data)[:10]" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [], "source": [ "rdd = sc.parallelize(data).reduceByKey(lambda x, y: x+y)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[('ro', 'b'),\n", " ('vf', 'b'),\n", " ('es', 'b'),\n", " ('er', 'b'),\n", " ('kq', 'b'),\n", " ('gw', 'b'),\n", " ('jt', 'b'),\n", " ('my', 'b'),\n", " ('xx', 'b'),\n", " ('ui', 'b')]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "new_data = fake_data(1000, 'b')\n", "list(new_data)[:10]" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": true }, "outputs": [], "source": [ "rdd_new = sc.parallelize(new_data).reduceByKey(lambda x, y: x+y).cache()" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [], "source": [ "rdd_updated = rdd.join(rdd_new)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[('sz', ('aaaaaaaaaaaaaaaa', 'bbb')),\n", " ('sc', ('aaaaaaaaaaaaaa', 'bb')),\n", " ('kt', ('aaaaaaaaaaaaaaaa', 'b')),\n", " ('wg', ('aaaaaaaaaaaaaaaaaaa', 'bb')),\n", " ('vt', ('aaaaaaaaaaaaa', 'b')),\n", " ('xb', ('aaaaaaaaaaaaaaaaaaa', 'b')),\n", " ('oa', ('aaaaaaaaaaaaaaa', 'bb')),\n", " ('uy', ('aaaaaaaaaaaaaaaaaaaaa', 'b')),\n", " ('gu', ('aaaaaaaaaa', 'bb')),\n", " ('gb', ('aaaaaaaaaaaaaaaa', 'bb'))]" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd_updated.take(10)" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "### Using `partitionBy`\n", "\n", "The `join` operation will hash all the keys of both `rdd` and `rdd_nerw`, sending keys with the same hashes to the same node for the actual join operation. There is a lot of unnecessary data transfer. Since `rdd` is a much larger data set than `rdd_new`, we can instead fix the partitioning of `rdd` and just transfer the keys of `rdd_new`. This is done by `rdd.partitionBy(numPartitions)` where `numPartitions` should be at least twice the number of cores." ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": true }, "outputs": [], "source": [ "rdd2 = sc.parallelize(data).reduceByKey(lambda x, y: x+y)\n", "rdd2 = rdd2.partitionBy(10).cache()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": true }, "outputs": [], "source": [ "rdd2_updated = rdd2.join(rdd_new)" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[('sz', ('aaaaaaaaaaaaaaaa', 'bbb')),\n", " ('tp', ('aaaaaaaaaaaaaaa', 'b')),\n", " ('sf', ('aaaaaaaaaaaaaaaaaaaaaa', 'bbb')),\n", " ('qo', ('aaaaaaaaaaaaaaaaaaa', 'bbb')),\n", " ('nh', ('aaaaaaaaaaaaa', 'b')),\n", " ('df', ('aaaaaaaaaaaa', 'bb')),\n", " ('kw', ('aaaaaaaaaaaaaaaaaa', 'bb')),\n", " ('fo', ('aaaaaaaaaaaaaaa', 'b')),\n", " ('tl', ('aaaaaaaaaaaaaa', 'bb')),\n", " ('lh', ('aaaaaaaaaaaaa', 'b'))]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd2_updated.take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Piping to External Programs\n", "----\n", "\n", "Suppose it is more convenient or efficient to write a function in some other language to process data. We can **pipe** data from Spark to the external program (script) that performs the calculation via standard input and output. The example below shows using a C++ program to calculate the sum of squares for collections of numbers." ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting foo.cpp\n" ] } ], "source": [ "%%file foo.cpp\n", "\n", "#include \n", "#include \n", "#include \n", "#include \n", "#include \n", "using namespace std;\n", "\n", "double sum_squares(double x, double y) {\n", " return x + y*y;\n", "};\n", "\n", "int main() {\n", "\n", " string s;\n", "\n", " while (cin) {\n", "\n", " getline(cin, s);\n", " stringstream stream(s);\n", " vector v;\n", "\n", " while(1) {\n", " double u;\n", " stream >> u;\n", " if(!stream)\n", " break;\n", " v.push_back(u);\n", " }\n", " if (v.size()) {\n", " double x = accumulate(v.begin(), v.end(), 0.0, sum_squares);\n", " cout << x << endl;\n", " }\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": 28, "metadata": { "collapsed": false }, "outputs": [], "source": [ "! g++ foo.cpp -o foo" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "collapsed": false }, "outputs": [], "source": [ "xs = np.random.random((10, 3))\n", "np.savetxt('numbers.txt', xs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Feed data via re-direction" ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.12948\n", "1.27958\n", "0.711174\n", "0.145084\n", "1.53344\n", "1.00307\n", "1.64678\n", "1.35042\n", "1.77033\n", "1.26898\n" ] } ], "source": [ "%%bash\n", "\n", "./foo < numbers.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Feed data via pipling" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2.12948\n", "1.27958\n", "0.711174\n", "0.145084\n", "1.53344\n", "1.00307\n", "1.64678\n", "1.35042\n", "1.77033\n", "1.26898\n" ] } ], "source": [ "%%bash\n", "\n", "cat numbers.txt | ./foo" ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5.270539741683482049e-01 9.538059079198231149e-01 9.705379757246932471e-01\r\n", "8.224342507879207620e-01 4.863559000065119653e-01 6.055084142317349594e-01\r\n", "3.776646942097252602e-01 6.458831090447878509e-01 3.890739818068755795e-01\r\n", "3.894152894179003788e-02 3.690381691456464663e-01 8.589506712912342579e-02\r\n", "7.596819591747848710e-01 5.785597615102290314e-01 7.884100040999678649e-01\r\n", "8.717886662425843314e-01 4.836717890004667009e-01 9.547083256957378250e-02\r\n", "7.107374952186653605e-01 5.218853770211685505e-01 9.323470966394742376e-01\r\n", "8.793413319051871513e-01 3.959469860304939415e-01 6.483846319494085408e-01\r\n", "9.579829009525054895e-01 2.739685046015039038e-01 8.817835757073387848e-01\r\n", "5.315242953832449713e-01 3.254503074377856908e-01 9.383707453566396683e-01\r\n" ] } ], "source": [ "!head numbers.txt" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "collapsed": false }, "outputs": [], "source": [ "rdd = sc.textFile('numbers.txt')" ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from pyspark import SparkFiles\n", "\n", "def prepare(line):\n", " \"\"\"Each line contains numbers separated by a space.\"\"\"\n", " return ' '.join(line.split()) + '\\n'\n", "\n", "# pipe data to external function\n", "func = './foo'\n", "sc.addFile(func)\n", "ss = rdd.map(lambda s: prepare(s)).pipe(SparkFiles.get(func))" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "array([ 2.12948 , 1.27958 , 0.711174, 0.145084, 1.53344 , 1.00307 ,\n", " 1.64678 , 1.35042 , 1.77033 , 1.26898 ])" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(ss.collect(), dtype='float')" ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "array([ 2.12947556, 1.2795806 , 0.71117418, 0.14508358, 1.53343841,\n", " 1.00306856, 1.64678324, 1.35041782, 1.77033225, 1.26897563])" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.sum(xs**2, 1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Version" ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "collapsed": false }, "outputs": [ { "data": { "application/json": { "Software versions": [ { "module": "Python", "version": "3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)]" }, { "module": "IPython", "version": "4.0.3" }, { "module": "OS", "version": "Darwin 15.4.0 x86_64 i386 64bit" }, { "module": "pyspark", "version": "The 'pyspark' distribution was not found and is required by the application" }, { "module": "numpy", "version": "1.10.4" } ] }, "text/html": [ "
SoftwareVersion
Python3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)]
IPython4.0.3
OSDarwin 15.4.0 x86_64 i386 64bit
pysparkThe 'pyspark' distribution was not found and is required by the application
numpy1.10.4
Tue Apr 19 13:19:24 2016 EDT
" ], "text/latex": [ "\\begin{tabular}{|l|l|}\\hline\n", "{\\bf Software} & {\\bf Version} \\\\ \\hline\\hline\n", "Python & 3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)] \\\\ \\hline\n", "IPython & 4.0.3 \\\\ \\hline\n", "OS & Darwin 15.4.0 x86\\_64 i386 64bit \\\\ \\hline\n", "pyspark & The 'pyspark' distribution was not found and is required by the application \\\\ \\hline\n", "numpy & 1.10.4 \\\\ \\hline\n", "\\hline \\multicolumn{2}{|l|}{Tue Apr 19 13:19:24 2016 EDT} \\\\ \\hline\n", "\\end{tabular}\n" ], "text/plain": [ "Software versions\n", "Python 3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)]\n", "IPython 4.0.3\n", "OS Darwin 15.4.0 x86_64 i386 64bit\n", "pyspark The 'pyspark' distribution was not found and is required by the application\n", "numpy 1.10.4\n", "Tue Apr 19 13:19:24 2016 EDT" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%load_ext version_information\n", "%version_information pyspark, numpy" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.1" } }, "nbformat": 4, "nbformat_minor": 0 }