Parallel Programming¶
The goal is to design parallel programs that are flexible, efficient and simple.
Step 0: Start by profiling a serial program to identify bottlenecks
Step 1: Are there for opportunities for parallelism?
- Can tasks be performed in parallel?
- Function calls
- Loops
- Can data be split and operated on in parallel?
- Decomposition of arrays along rows, columns, blocks
- Decomposition of trees into sub-trees
- Is there a pipeline with a sequence of stages?
- Data preprocessing and analysis
- Graphics rendering
Step 2: What is the nature of the parallelism?
- Linear
- Embarrassingly parallel programs
- Recursive
- Adaptive partitioning methods
Step 3: What is the granularity?
- 10s of jobs
- 1000s of jobs
Step 4: Choose an algorithm
- Organize by tasks
- Task parallelism
- Divide and conquer
- Organize by data
- Geometric decomposition
- Recursive decomposition
- Organize by flow
- Pipeline
- Event-based processing
Step 5: Map to program and data structures
- Program structures
- Single program multiple data (SPMD)
- Master/worker
- Loop parallelism
- Fork/join
- Data structures
- Shared data
- Shared queue
- Distributed array
Step 6: Map to parallel environment
- Multi-core shared memory
- Cython with OpenMP
- multiprocessing
- IPython.cluster
- Multi-computer
- IPython.cluster
- MPI
- Hadoop / Spark
- GPU
- CUDA
- OpenCL
Step 7: Execute, debug, tune in parallel environment
Embarrassingly parallel programs¶
Many statistical problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:
- Monte Carlo integration
- Multiple chains of MCMC
- Bootstrap for confidence intervals
- Power calculations by simulation
- Permutation-resampling tests
- Fitting same model on multiple data sets
Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data point to calculate the likelihood), and this is an embarrassingly parallel problem within a single iteration.
These “low hanging fruits” are great because they offer a path to easy parallelism with minimal complexity.
Executing parallel code¶
The bigger the problem, the more scope there is for parallelism
Amhdahls’ law says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the algorithm. However, for big data analysis, Gustafson’s Law is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points.
Threading¶
The Python interpreter only allows one thread to execute at any one time. This is known as the Global Interpreter Lock (GIL) and generally makes the use of threads an ineffective strategy for CPU-bound tasks. However, threads are useful in I/O bound tasks such as reading/writing/downloading files since threads give up their lock once an I//O task has been initiated.
CPU-bound task¶
Estimating :math:`pi` using Monte Carlo integration
This is clearly a toy example, but the template can be used for most embarrassingly parallel problems and is a good example of a CPU-bound task.
In [1]:
def mc_pi(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [2]:
n = int(1e7)
In [3]:
%%time
mc_pi(n)
CPU times: user 15.8 s, sys: 21.1 ms, total: 15.8 s
Wall time: 15.8 s
Out[3]:
3.1417504
Optimizations without parallelism¶
Review of techniques to accelerate Python code.
In [4]:
def mc_pi1(n):
x = np.random.uniform(-1, 1, (n,2))
return 4*np.sum((x**2).sum(1) < 1)/n
In [5]:
%%time
mc_pi1(n)
CPU times: user 786 ms, sys: 300 ms, total: 1.09 s
Wall time: 1.08 s
Out[5]:
3.1411228000000002
In [6]:
from numba import jit, njit
In [7]:
@njit()
def mc_pi2(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
In [8]:
%%time
mc_pi2(n)
CPU times: user 639 ms, sys: 9.36 ms, total: 648 ms
Wall time: 645 ms
Out[8]:
3.1420776
Cython¶
It is a little more work to optimize the Cython version. We do not use numpy.random as that is optimized for generating arrays rather than single numbers. Instead we use the GNU Scientific Library (wrapped for us in the package cython_gsl) to draw uniform random variates.
pip install cythongsl
In [9]:
%load_ext cython
In [10]:
%%cython -a -lgsl
import cython
from cython_gsl cimport *
@cython.cdivision(True)
def mc_pi3(int n):
cdef gsl_rng_type * T
cdef gsl_rng * r
cdef double s = 0.0
cdef double x, y
cdef int i
gsl_rng_env_setup()
T = gsl_rng_default
r = gsl_rng_alloc (T)
for i in range(n):
x = 2*gsl_rng_uniform(r) - 1
y = 2*gsl_rng_uniform(r)- 1
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
Out[10]:
Generated by Cython 0.23.4
Yellow lines hint at Python interaction.
Click on a line that starts with a "+
" to see the C code that Cython generated for it.
01:
02: import cython
03: from cython_gsl cimport *
04:
05: @cython.cdivision(True)
+06: def mc_pi3(int n):
/* Python wrapper */ static PyObject *__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3(PyObject *__pyx_self, PyObject *__pyx_arg_n); /*proto*/ static PyMethodDef __pyx_mdef_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3 = {"mc_pi3", (PyCFunction)__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3, METH_O, 0}; static PyObject *__pyx_pw_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3(PyObject *__pyx_self, PyObject *__pyx_arg_n) { int __pyx_v_n; PyObject *__pyx_r = 0; __Pyx_RefNannyDeclarations __Pyx_RefNannySetupContext("mc_pi3 (wrapper)", 0); assert(__pyx_arg_n); { __pyx_v_n = __Pyx_PyInt_As_int(__pyx_arg_n); if (unlikely((__pyx_v_n == (int)-1) && PyErr_Occurred())) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L3_error;} } goto __pyx_L4_argument_unpacking_done; __pyx_L3_error:; __Pyx_AddTraceback("_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf.mc_pi3", __pyx_clineno, __pyx_lineno, __pyx_filename); __Pyx_RefNannyFinishContext(); return NULL; __pyx_L4_argument_unpacking_done:; __pyx_r = __pyx_pf_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_mc_pi3(__pyx_self, ((int)__pyx_v_n)); int __pyx_lineno = 0; const char *__pyx_filename = NULL; int __pyx_clineno = 0; /* function exit code */ __Pyx_RefNannyFinishContext(); return __pyx_r; } static PyObject *__pyx_pf_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_mc_pi3(CYTHON_UNUSED PyObject *__pyx_self, int __pyx_v_n) { gsl_rng_type *__pyx_v_T; gsl_rng *__pyx_v_r; double __pyx_v_s; double __pyx_v_x; double __pyx_v_y; CYTHON_UNUSED int __pyx_v_i; PyObject *__pyx_r = NULL; __Pyx_RefNannyDeclarations __Pyx_RefNannySetupContext("mc_pi3", 0); /* … */ /* function exit code */ __pyx_L1_error:; __Pyx_XDECREF(__pyx_t_4); __Pyx_AddTraceback("_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf.mc_pi3", __pyx_clineno, __pyx_lineno, __pyx_filename); __pyx_r = NULL; __pyx_L0:; __Pyx_XGIVEREF(__pyx_r); __Pyx_RefNannyFinishContext(); return __pyx_r; } /* … */ __pyx_tuple_ = PyTuple_Pack(8, __pyx_n_s_n, __pyx_n_s_n, __pyx_n_s_T, __pyx_n_s_r, __pyx_n_s_s, __pyx_n_s_x, __pyx_n_s_y, __pyx_n_s_i); if (unlikely(!__pyx_tuple_)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;} __Pyx_GOTREF(__pyx_tuple_); __Pyx_GIVEREF(__pyx_tuple_); /* … */ __pyx_t_1 = PyCFunction_NewEx(&__pyx_mdef_46_cython_magic_73ea126a291c7fb73e3c8fcbef013dcf_1mc_pi3, NULL, __pyx_n_s_cython_magic_73ea126a291c7fb73e); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;} __Pyx_GOTREF(__pyx_t_1); if (PyDict_SetItem(__pyx_d, __pyx_n_s_mc_pi3, __pyx_t_1) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 6; __pyx_clineno = __LINE__; goto __pyx_L1_error;} __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
07: cdef gsl_rng_type * T
08: cdef gsl_rng * r
+09: cdef double s = 0.0
__pyx_v_s = 0.0;
10: cdef double x, y
11: cdef int i
12:
+13: gsl_rng_env_setup()
gsl_rng_env_setup();
14:
+15: T = gsl_rng_default
__pyx_v_T = gsl_rng_default;
+16: r = gsl_rng_alloc (T)
__pyx_v_r = gsl_rng_alloc(__pyx_v_T);
17:
+18: for i in range(n):
__pyx_t_1 = __pyx_v_n; for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_1; __pyx_t_2+=1) { __pyx_v_i = __pyx_t_2;
+19: x = 2*gsl_rng_uniform(r) - 1
__pyx_v_x = ((2.0 * gsl_rng_uniform(__pyx_v_r)) - 1.0);
+20: y = 2*gsl_rng_uniform(r)- 1
__pyx_v_y = ((2.0 * gsl_rng_uniform(__pyx_v_r)) - 1.0);
+21: if (x**2 + y**2) < 1:
__pyx_t_3 = (((pow(__pyx_v_x, 2.0) + pow(__pyx_v_y, 2.0)) < 1.0) != 0); if (__pyx_t_3) { /* … */ } }
+22: s += 1
__pyx_v_s = (__pyx_v_s + 1.0);
+23: return 4*s/n
__Pyx_XDECREF(__pyx_r); __pyx_t_4 = PyFloat_FromDouble(((4.0 * __pyx_v_s) / __pyx_v_n)); if (unlikely(!__pyx_t_4)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 23; __pyx_clineno = __LINE__; goto __pyx_L1_error;} __Pyx_GOTREF(__pyx_t_4); __pyx_r = __pyx_t_4; __pyx_t_4 = 0; goto __pyx_L0;
In [11]:
%%time
mc_pi3(n)
CPU times: user 308 ms, sys: 1.39 ms, total: 309 ms
Wall time: 308 ms
Out[11]:
3.1414584
Parallelization with threads¶
We reduce the number of Monte Carlo samples by a factor of 10, but run 10 such jobs independent. The result returned will be an generator expression with 10 values.. We then average to get the results. Notice that threading does not help much, if at all.
In [12]:
from concurrent.futures import ThreadPoolExecutor
In [13]:
%%time
njobs = 10
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi3, [int(1e6)]*njobs)
print(sum(res)/njobs)
3.1427279999999995
CPU times: user 311 ms, sys: 7.19 ms, total: 319 ms
Wall time: 313 ms
Parallelization with processes¶
We switch a multi-process model. Unlike threads, processes do run in parallel. Now you should see some speedup from parallelization.
In [14]:
from concurrent.futures import ProcessPoolExecutor
In [15]:
%%time
njobs = 10
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi3, [int(1e6)]*njobs)
print(sum(res)/njobs)
3.1427279999999995
CPU times: user 15.2 ms, sys: 31.5 ms, total: 46.6 ms
Wall time: 138 ms
Using multiprocessing
¶
There is another module for parallel processes in the standard library that provides more data structures and control - for example, if you need the processes to share some data or communicate in some way. See official documentation for details.
In [16]:
from multiprocessing import cpu_count, Pool
In [17]:
cpu_count()
Out[17]:
8
In [18]:
%%time
pool = Pool()
res = pool.map(mc_pi3, [int(1e6)]*njobs)
pool.close()
print(sum(res)/njobs)
3.1427279999999995
CPU times: user 13.7 ms, sys: 43.6 ms, total: 57.3 ms
Wall time: 116 ms
Running functions with multiple arguments¶
If a function f takes multiple arguments, we can use startmap
. The
starmap
method acts as though it applies f(*args) to the iterable
of args passed to the worker pool.
In [19]:
def add(a, b):
return a + b
In [20]:
x = np.arange(10)
y = x[::-1]
In [21]:
list(zip(x, y))
Out[21]:
[(0, 9),
(1, 8),
(2, 7),
(3, 6),
(4, 5),
(5, 4),
(6, 3),
(7, 2),
(8, 1),
(9, 0)]
In [22]:
pool = Pool()
res = pool.starmap(add, list(zip(x, y)))
pool.close()
res
Out[22]:
[9, 9, 9, 9, 9, 9, 9, 9, 9, 9]
Using ipyparallel
¶
Parallel execution is tightly integrated with Jupyter in the
ipyparallel
package. Install with
conda install ipyparallel
ipcluster nbextension enable
You can start a cluster on the IPython Clusters
tab.
In [23]:
from ipyparallel import Client
In [24]:
rc = Client()
In [25]:
rc.ids
Out[25]:
[0, 1, 2, 3, 4, 5, 6, 7]
In [26]:
dv = rc[:]
In [27]:
res = dv.map_sync(mc_pi1, [int(1e6)] * 10)
print(sum(res)/njobs)
3.1417084
In [ ]: