{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Parallel Programming\n", "====\n", "\n", "The goal is to design parallel programs that are flexible, efficient and simple.\n", "\n", "**Step 0**: Start by profiling a serial program to identify bottlenecks\n", "\n", "**Step 1**: Are there for opportunities for parallelism?\n", "\n", "- Can tasks be performed in parallel?\n", " - Function calls\n", " - Loops\n", "- Can data be split and operated on in parallel?\n", " - Decomposition of arrays along rows, columns, blocks\n", " - Decomposition of trees into sub-trees\n", "- Is there a pipeline with a sequence of stages?\n", " - Data preprocessing and analysis\n", " - Graphics rendering\n", "\n", "**Step 2**: What is the nature of the parallelism?\n", "\n", "- Linear\n", " - Embarrassingly parallel programs\n", "- Recursive\n", " - Adaptive partitioning methods\n", "\n", "**Step 3**: What is the granularity?\n", "\n", "- 10s of jobs\n", "- 1000s of jobs\n", "\n", "**Step 4**: Choose an algorithm\n", "\n", "- Organize by tasks\n", " - Task parallelism\n", " - Divide and conquer\n", "\n", "- Organize by data\n", " - Geometric decomposition\n", " - Recursive decomposition\n", "\n", "- Organize by flow\n", " - Pipeline\n", " - Event-based processing\n", "\n", "**Step 5**: Map to program and data structures\n", "\n", "- Program structures\n", " - Single program multiple data (SPMD)\n", " - Master/worker\n", " - Loop parallelism\n", " - Fork/join\n", "- Data structures \n", " - Shared data\n", " - Shared queue\n", " - Distributed array\n", "\n", "**Step 6**: Map to parallel environment\n", "\n", "- Multi-core shared memory\n", " - Cython with OpenMP\n", " - multiprocessing\n", " - IPython.cluster\n", "- Multi-computer\n", " - IPython.cluster\n", " - MPI\n", " - Hadoop / Spark\n", "- GPU\n", " - CUDA\n", " - OpenCL\n", "\n", "**Step 7**: Execute, debug, tune in parallel environment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Embarrassingly parallel programs\n", "----\n", "\n", "Many statistical problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:\n", "\n", "- Monte Carlo integration\n", "- Multiple chains of MCMC\n", "- Bootstrap for confidence intervals\n", "- Power calculations by simulation\n", "- Permutation-resampling tests \n", "- Fitting same model on multiple data sets\n", "\n", "Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data point to calculate the likelihood), and this is an embarrassingly parallel problem within a single iteration. \n", "\n", "These \"low hanging fruits\" are great because they offer a path to easy parallelism with minimal complexity." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Executing parallel code\n", "----\n", "\n", "**The bigger the problem, the more scope there is for parallelism**\n", "\n", "**Amhdahls' law** says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the algorithm. However, for big data analysis, **Gustafson's Law** is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Threading\n", "----\n", "\n", "The Python interpreter only allows one thread to execute at any one time. This is known as the Global Interpreter Lock (GIL) and generally makes the use of threads an ineffective strategy for CPU-bound tasks. However, threads are useful in I/O bound tasks such as reading/writing/downloading files since threads give up their lock once an I//O task has been initiated. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### CPU-bound task\n", "\n", "**Estimating $\\pi$ using Monte Carlo integration**\n", "\n", "This is clearly a toy example, but the template can be used for most embarrassingly parallel problems and is a good example of a CPU-bound task." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def mc_pi(n):\n", " s = 0\n", " for i in range(n):\n", " x = np.random.uniform(-1, 1)\n", " y = np.random.uniform(-1, 1)\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [], "source": [ "n = int(1e7)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 15.8 s, sys: 21.1 ms, total: 15.8 s\n", "Wall time: 15.8 s\n" ] }, { "data": { "text/plain": [ "3.1417504" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "mc_pi(n)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Optimizations without parallelism\n", "\n", "Review of techniques to accelerate Python code." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def mc_pi1(n):\n", " x = np.random.uniform(-1, 1, (n,2))\n", " return 4*np.sum((x**2).sum(1) < 1)/n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 786 ms, sys: 300 ms, total: 1.09 s\n", "Wall time: 1.08 s\n" ] }, { "data": { "text/plain": [ "3.1411228000000002" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time \n", "mc_pi1(n)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from numba import jit, njit" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], "source": [ "@njit()\n", "def mc_pi2(n):\n", " s = 0\n", " for i in range(n):\n", " x = np.random.uniform(-1, 1)\n", " y = np.random.uniform(-1, 1)\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 639 ms, sys: 9.36 ms, total: 648 ms\n", "Wall time: 645 ms\n" ] }, { "data": { "text/plain": [ "3.1420776" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "mc_pi2(n)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Cython\n", "\n", "It is a little more work to optimize the Cython version. We do not use numpy.random as that is optimized for generating arrays rather than single numbers. Instead we use the GNU Scientific Library (wrapped for us in the package cython_gsl) to draw uniform random variates.\n", "\n", "```\n", "pip install cythongsl\n", "```" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "%load_ext cython" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", " \n", "Generated by Cython 0.23.4
\n", "\n",
" Yellow lines hint at Python interaction.
\n",
" Click on a line that starts with a \"+\" to see the C code that Cython generated for it.\n",
"
01: \n",
"02: import cython\n", "
03: from cython_gsl cimport *\n", "
04: \n",
"05: @cython.cdivision(True)\n", "
+06: def mc_pi3(int n):\n", "
/* Python wrapper */\n",
"static PyObject *__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3(PyObject *__pyx_self, PyObject *__pyx_arg_n); /*proto*/\n",
"static PyMethodDef __pyx_mdef_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3 = {\"mc_pi3\", (PyCFunction)__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3, METH_O, 0};\n",
"static PyObject *__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3(PyObject *__pyx_self, PyObject *__pyx_arg_n) {\n",
" int __pyx_v_n;\n",
" PyObject *__pyx_r = 0;\n",
" __Pyx_RefNannyDeclarations\n",
" __Pyx_RefNannySetupContext(\"mc_pi3 (wrapper)\", 0);\n",
" assert(__pyx_arg_n); {\n",
" __pyx_v_n = __Pyx_PyInt_As_int(__pyx_arg_n); if (unlikely((__pyx_v_n == (int)-1) && PyErr_Occurred())) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L3_error;}\n",
" }\n",
" goto __pyx_L4_argument_unpacking_done;\n",
" __pyx_L3_error:;\n",
" __Pyx_AddTraceback(\"_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf.mc_pi3\", __pyx_clineno, __pyx_lineno, __pyx_filename);\n",
" __Pyx_RefNannyFinishContext();\n",
" return NULL;\n",
" __pyx_L4_argument_unpacking_done:;\n",
" __pyx_r = __pyx_pf_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_mc_pi3(__pyx_self, ((int)__pyx_v_n));\n",
" int __pyx_lineno = 0;\n",
" const char *__pyx_filename = NULL;\n",
" int __pyx_clineno = 0;\n",
"\n",
" /* function exit code */\n",
" __Pyx_RefNannyFinishContext();\n",
" return __pyx_r;\n",
"}\n",
"\n",
"static PyObject *__pyx_pf_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_mc_pi3(CYTHON_UNUSED PyObject *__pyx_self, int __pyx_v_n) {\n",
" gsl_rng_type *__pyx_v_T;\n",
" gsl_rng *__pyx_v_r;\n",
" double __pyx_v_s;\n",
" double __pyx_v_x;\n",
" double __pyx_v_y;\n",
" CYTHON_UNUSED int __pyx_v_i;\n",
" PyObject *__pyx_r = NULL;\n",
" __Pyx_RefNannyDeclarations\n",
" __Pyx_RefNannySetupContext(\"mc_pi3\", 0);\n",
"/* … */\n",
" /* function exit code */\n",
" __pyx_L1_error:;\n",
" __Pyx_XDECREF(__pyx_t_4);\n",
" __Pyx_AddTraceback(\"_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf.mc_pi3\", __pyx_clineno, __pyx_lineno, __pyx_filename);\n",
" __pyx_r = NULL;\n",
" __pyx_L0:;\n",
" __Pyx_XGIVEREF(__pyx_r);\n",
" __Pyx_RefNannyFinishContext();\n",
" return __pyx_r;\n",
"}\n",
"/* … */\n",
" __pyx_tuple_ = PyTuple_Pack(8, __pyx_n_s_n, __pyx_n_s_n, __pyx_n_s_T, __pyx_n_s_r, __pyx_n_s_s, __pyx_n_s_x, __pyx_n_s_y, __pyx_n_s_i); if (unlikely(!__pyx_tuple_)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;}\n",
" __Pyx_GOTREF(__pyx_tuple_);\n",
" __Pyx_GIVEREF(__pyx_tuple_);\n",
"/* … */\n",
" __pyx_t_1 = PyCFunction_NewEx(&__pyx_mdef_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3, NULL, __pyx_n_s_cython_magic_73ea126a291c7fb73e); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;}\n",
" __Pyx_GOTREF(__pyx_t_1);\n",
" if (PyDict_SetItem(__pyx_d, __pyx_n_s_mc_pi3, __pyx_t_1) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;}\n",
" __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;\n",
"07: cdef gsl_rng_type * T\n", "
08: cdef gsl_rng * r\n", "
+09: cdef double s = 0.0\n", "
__pyx_v_s = 0.0;\n",
"10: cdef double x, y\n", "
11: cdef int i\n", "
12: \n",
"+13: gsl_rng_env_setup()\n", "
gsl_rng_env_setup();\n",
" 14: \n",
"+15: T = gsl_rng_default\n", "
__pyx_v_T = gsl_rng_default;\n",
"+16: r = gsl_rng_alloc (T)\n", "
__pyx_v_r = gsl_rng_alloc(__pyx_v_T);\n",
" 17: \n",
"+18: for i in range(n):\n", "
__pyx_t_1 = __pyx_v_n;\n",
" for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_1; __pyx_t_2+=1) {\n",
" __pyx_v_i = __pyx_t_2;\n",
"+19: x = 2*gsl_rng_uniform(r) - 1\n", "
__pyx_v_x = ((2.0 * gsl_rng_uniform(__pyx_v_r)) - 1.0);\n",
"+20: y = 2*gsl_rng_uniform(r)- 1\n", "
__pyx_v_y = ((2.0 * gsl_rng_uniform(__pyx_v_r)) - 1.0);\n",
"+21: if (x**2 + y**2) < 1:\n", "
__pyx_t_3 = (((pow(__pyx_v_x, 2.0) + pow(__pyx_v_y, 2.0)) < 1.0) != 0);\n",
" if (__pyx_t_3) {\n",
"/* … */\n",
" }\n",
" }\n",
"+22: s += 1\n", "
__pyx_v_s = (__pyx_v_s + 1.0);\n",
"+23: return 4*s/n\n", "
__Pyx_XDECREF(__pyx_r);\n", " __pyx_t_4 = PyFloat_FromDouble(((4.0 * __pyx_v_s) / __pyx_v_n)); if (unlikely(!__pyx_t_4)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 23; __pyx_clineno = __LINE__; goto __pyx_L1_error;}\n", " __Pyx_GOTREF(__pyx_t_4);\n", " __pyx_r = __pyx_t_4;\n", " __pyx_t_4 = 0;\n", " goto __pyx_L0;\n", "