from __future__ import division
import os
import sys
import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
%matplotlib inline
%precision 4
plt.style.use('ggplot')
%load_ext cythonmagic
from numba import jit, typeof, int32, int64, float32, float64
import random

Writing Parallel Code

The goal is to desing parallel programs that are flexible, efficient and simple.

Step 0: Start by profiling a serial program to identify bottlenecks

Step 1: Are there for opportunities for parallism?

  • Can tasks be perforemd in parallel?
    • Function calls
    • Loops
  • Can data be split and operated on in parallel?
    • Decomposition of arrays along rows, columns, blocks
    • Decomposition of trees into sub-trees
  • Is there a pipeline with a sequence of stages?
    • Data preprocesing and analysis
    • Graphics rendering

Step 2: What is the nature of the parallelism?

  • Linear
    • Embarassingly parallel programs
  • Recursive
    • Adaptive partitioning methods

Step 3: What is the granularity?

  • 10s of jobs
  • 1000s of jobs

Step 4: Choose an algorihtm

  • Organize by tasks
    • Task parallelism
    • Dvidie and conquer
  • Organize by data
    • Geometric decomposition
    • Recursvie decomposition
  • Organize by flow
    • Pipeline
    • Event-based processing

Step 5: Map to program and data structures

  • Program structures
    • Single program multiple data (SPMD)
    • Master/worker
    • Loop parallelism
    • Fork/join
  • Data structures
    • Shared data
    • Shared queue
    • Distributed array

Step 6: Map to parallel environment

  • Multi-core shared memrory
    • Cython with OpenMP
    • multiprocessing
    • IPython.cluster
  • Multi-computer
    • IPython.cluster
    • MPI
    • Hadoop / Spark
  • GPU
    • CUDA
    • OpenCL

Step 7: Execute, debug, tune in parallel environment

Concepts

  • A task is a chunk of work that a parallel Unit of Execution can do
  • A Unit of Execution (UE) is a process or thread
  • A Processing Element (PE) is a hardware computational unit - e.g. a hyperthreaded core
  • Load balance refers to how tasks are distributed to Processing Eleements
  • Synchronization occurs when execution must stop at the same point for all Units of Execution
  • Race conditions occur when different Units of Executions compete for the same resource and the output depends on who gets the resource first
  • Dead locks occur when A is waiting for B and B is waiting for A

Embarassingly parallel programs

Many statistical problems are embarassingly parallel and cna be easily decomposed into independent tasks or data sets. Here are several examples:

  • Monte Carlo integration
  • Mulitiple chains of MCMC
  • Boostrap for condence intervals
  • Power calculations by simulation
  • Permuatation-resampling tests
  • Fitting same model on multiple data sets

Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data poinnt to calculate the likelihood), and this is an embarassingly parallel problem within a single itneration.

These “low hanging fruits” are great because they offer a path to easy parallelism with minimal complexity.

Estimating \(\pi\) using Monte Carlo integration

This is clearly a toy example, but the template cna be used for most embarassingly parallel problems. First we see how much we can speed-up the serial code by the use of compilation, then we apply parallel processing for a furhter linear speed-up in the number of processors.

def pi_python(n):
    s = 0
    for i in range(n):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return s/n
stats = %prun -r -q pi_python(1000000)
stats.sort_stats('time').print_stats(5);
       4000004 function calls in 2.329 seconds

 Ordered by: internal time
 List reduced from 6 to 5 due to restriction <5>

 ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      1    1.132    1.132    2.329    2.329 <ipython-input-5-fe39fab6b921>:1(pi_python)
2000000    0.956    0.000    1.141    0.000 random.py:358(uniform)
2000000    0.185    0.000    0.185    0.000 {method 'random' of '_random.Random' objects}
      1    0.056    0.056    0.056    0.056 {range}
      1    0.000    0.000    2.329    2.329 <string>:1(<module>)
def pi_numpy(n):
    xs = np.random.uniform(-1, 1, (n,2))
    return 4.0*((xs**2).sum(axis=1).sum() < 1)/n
@jit
def pi_numba(n):
    s = 0
    for i in range(n):
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)
        if x**2 + y**2 < 1:
            s += 1
    return s/n

This usse the GNU Scientific lirbary. You may need to instal it

wget ftp://ftp.gnu.org/gnu/gsl/gsl-latest.tar.gz
tar -xzf gsl-latest.tar.gz
cd gsl-1.16
./configure --prefilx=/usr/local
make
make install

and then

pip install cythongsl
%%cython -a -lgsl
import cython
import numpy as np
cimport numpy as np
from cython_gsl cimport gsl_rng_mt19937, gsl_rng, gsl_rng_alloc, gsl_rng_uniform

cdef gsl_rng *r = gsl_rng_alloc(gsl_rng_mt19937)

@cython.cdivision
@cython.boundscheck(False)
@cython.wraparound(False)
def pi_cython(int n):
    cdef int[:] s = np.zeros(n, dtype=np.int32)
    cdef int i = 0
    cdef double x, y
    for i in range(n):
        x = gsl_rng_uniform(r)*2 - 1
        y = gsl_rng_uniform(r)*2 - 1
        s[i] = x**2 + y**2 < 1
    cdef int hits = 0
    for i in range(n):
        hits += s[i]
    return 4.0*hits/n

Generated by Cython 0.22

+01: import cython
  __pyx_t_1 = PyDict_New(); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 1; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  if (PyDict_SetItem(__pyx_d, __pyx_n_s_test, __pyx_t_1) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 1; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
+02: import numpy as np
  __pyx_t_1 = __Pyx_Import(__pyx_n_s_numpy, 0, -1); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 2; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  if (PyDict_SetItem(__pyx_d, __pyx_n_s_np, __pyx_t_1) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 2; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
 03: cimport numpy as np
 04: from cython_gsl cimport gsl_rng_mt19937, gsl_rng, gsl_rng_alloc, gsl_rng_uniform
 05: 
+06: cdef gsl_rng *r = gsl_rng_alloc(gsl_rng_mt19937)
  __pyx_v_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_r = gsl_rng_alloc(gsl_rng_mt19937);
 07: 
 08: @cython.cdivision
 09: @cython.boundscheck(False)
 10: @cython.wraparound(False)
+11: def pi_cython(int n):
/* Python wrapper */
static PyObject *__pyx_pw_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_1pi_cython(PyObject *__pyx_self, PyObject *__pyx_arg_n); /*proto*/
static PyMethodDef __pyx_mdef_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_1pi_cython = {"pi_cython", (PyCFunction)__pyx_pw_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_1pi_cython, METH_O, 0};
static PyObject *__pyx_pw_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_1pi_cython(PyObject *__pyx_self, PyObject *__pyx_arg_n) {
  int __pyx_v_n;
  PyObject *__pyx_r = 0;
  __Pyx_RefNannyDeclarations
  __Pyx_RefNannySetupContext("pi_cython (wrapper)", 0);
  assert(__pyx_arg_n); {
    __pyx_v_n = __Pyx_PyInt_As_int(__pyx_arg_n); if (unlikely((__pyx_v_n == (int)-1) && PyErr_Occurred())) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 11; __pyx_clineno = __LINE__; goto __pyx_L3_error;}
  }
  goto __pyx_L4_argument_unpacking_done;
  __pyx_L3_error:;
  __Pyx_AddTraceback("_cython_magic_f0d9ca082c7b25a08598049bfef2323c.pi_cython", __pyx_clineno, __pyx_lineno, __pyx_filename);
  __Pyx_RefNannyFinishContext();
  return NULL;
  __pyx_L4_argument_unpacking_done:;
  __pyx_r = __pyx_pf_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_pi_cython(__pyx_self, ((int)__pyx_v_n));
  int __pyx_lineno = 0;
  const char *__pyx_filename = NULL;
  int __pyx_clineno = 0;

  /* function exit code */
  __Pyx_RefNannyFinishContext();
  return __pyx_r;
}

static PyObject *__pyx_pf_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_pi_cython(CYTHON_UNUSED PyObject *__pyx_self, int __pyx_v_n) {
  __Pyx_memviewslice __pyx_v_s = { 0, 0, { 0 }, { 0 }, { 0 } };
  int __pyx_v_i;
  double __pyx_v_x;
  double __pyx_v_y;
  int __pyx_v_hits;
  PyObject *__pyx_r = NULL;
  __Pyx_RefNannyDeclarations
  __Pyx_RefNannySetupContext("pi_cython", 0);
/* … */
  /* function exit code */
  __pyx_L1_error:;
  __Pyx_XDECREF(__pyx_t_1);
  __Pyx_XDECREF(__pyx_t_2);
  __Pyx_XDECREF(__pyx_t_3);
  __Pyx_XDECREF(__pyx_t_4);
  __Pyx_XDECREF(__pyx_t_5);
  __PYX_XDEC_MEMVIEW(&__pyx_t_6, 1);
  __Pyx_AddTraceback("_cython_magic_f0d9ca082c7b25a08598049bfef2323c.pi_cython", __pyx_clineno, __pyx_lineno, __pyx_filename);
  __pyx_r = NULL;
  __pyx_L0:;
  __PYX_XDEC_MEMVIEW(&__pyx_v_s, 1);
  __Pyx_XGIVEREF(__pyx_r);
  __Pyx_RefNannyFinishContext();
  return __pyx_r;
}
/* … */
  __pyx_tuple__19 = PyTuple_Pack(7, __pyx_n_s_n, __pyx_n_s_n, __pyx_n_s_s, __pyx_n_s_i, __pyx_n_s_x, __pyx_n_s_y, __pyx_n_s_hits); if (unlikely(!__pyx_tuple__19)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 11; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_tuple__19);
  __Pyx_GIVEREF(__pyx_tuple__19);
/* … */
  __pyx_t_1 = PyCFunction_NewEx(&__pyx_mdef_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_1pi_cython, NULL, __pyx_n_s_cython_magic_f0d9ca082c7b25a085); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 11; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  if (PyDict_SetItem(__pyx_d, __pyx_n_s_pi_cython, __pyx_t_1) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 11; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
  __pyx_codeobj__20 = (PyObject*)__Pyx_PyCode_New(1, 0, 7, 0, 0, __pyx_empty_bytes, __pyx_empty_tuple, __pyx_empty_tuple, __pyx_tuple__19, __pyx_empty_tuple, __pyx_empty_tuple, __pyx_kp_s_Users_cliburn_ipython_cython__c, __pyx_n_s_pi_cython, 11, __pyx_empty_bytes); if (unlikely(!__pyx_codeobj__20)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 11; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
+12:     cdef int[:] s = np.zeros(n, dtype=np.int32)
  __pyx_t_1 = __Pyx_GetModuleGlobalName(__pyx_n_s_np); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  __pyx_t_2 = __Pyx_PyObject_GetAttrStr(__pyx_t_1, __pyx_n_s_zeros); if (unlikely(!__pyx_t_2)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_2);
  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
  __pyx_t_1 = __Pyx_PyInt_From_int(__pyx_v_n); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  __pyx_t_3 = PyTuple_New(1); if (unlikely(!__pyx_t_3)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_3);
  PyTuple_SET_ITEM(__pyx_t_3, 0, __pyx_t_1);
  __Pyx_GIVEREF(__pyx_t_1);
  __pyx_t_1 = 0;
  __pyx_t_1 = PyDict_New(); if (unlikely(!__pyx_t_1)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_1);
  __pyx_t_4 = __Pyx_GetModuleGlobalName(__pyx_n_s_np); if (unlikely(!__pyx_t_4)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_4);
  __pyx_t_5 = __Pyx_PyObject_GetAttrStr(__pyx_t_4, __pyx_n_s_int32); if (unlikely(!__pyx_t_5)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_5);
  __Pyx_DECREF(__pyx_t_4); __pyx_t_4 = 0;
  if (PyDict_SetItem(__pyx_t_1, __pyx_n_s_dtype, __pyx_t_5) < 0) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_DECREF(__pyx_t_5); __pyx_t_5 = 0;
  __pyx_t_5 = __Pyx_PyObject_Call(__pyx_t_2, __pyx_t_3, __pyx_t_1); if (unlikely(!__pyx_t_5)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_5);
  __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0;
  __Pyx_DECREF(__pyx_t_3); __pyx_t_3 = 0;
  __Pyx_DECREF(__pyx_t_1); __pyx_t_1 = 0;
  __pyx_t_6 = __Pyx_PyObject_to_MemoryviewSlice_ds_int(__pyx_t_5);
  if (unlikely(!__pyx_t_6.memview)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 12; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_DECREF(__pyx_t_5); __pyx_t_5 = 0;
  __pyx_v_s = __pyx_t_6;
  __pyx_t_6.memview = NULL;
  __pyx_t_6.data = NULL;
+13:     cdef int i = 0
  __pyx_v_i = 0;
 14:     cdef double x, y
+15:     for i in range(n):
  __pyx_t_7 = __pyx_v_n;
  for (__pyx_t_8 = 0; __pyx_t_8 < __pyx_t_7; __pyx_t_8+=1) {
    __pyx_v_i = __pyx_t_8;
+16:         x = gsl_rng_uniform(r)*2 - 1
    __pyx_v_x = ((gsl_rng_uniform(__pyx_v_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_r) * 2.0) - 1.0);
+17:         y = gsl_rng_uniform(r)*2 - 1
    __pyx_v_y = ((gsl_rng_uniform(__pyx_v_46_cython_magic_f0d9ca082c7b25a08598049bfef2323c_r) * 2.0) - 1.0);
+18:         s[i] = x**2 + y**2 < 1
    __pyx_t_9 = __pyx_v_i;
    *((int *) ( /* dim=0 */ (__pyx_v_s.data + __pyx_t_9 * __pyx_v_s.strides[0]) )) = ((pow(__pyx_v_x, 2.0) + pow(__pyx_v_y, 2.0)) < 1.0);
  }
+19:     cdef int hits = 0
  __pyx_v_hits = 0;
+20:     for i in range(n):
  __pyx_t_7 = __pyx_v_n;
  for (__pyx_t_8 = 0; __pyx_t_8 < __pyx_t_7; __pyx_t_8+=1) {
    __pyx_v_i = __pyx_t_8;
+21:         hits += s[i]
    __pyx_t_10 = __pyx_v_i;
    __pyx_v_hits = (__pyx_v_hits + (*((int *) ( /* dim=0 */ (__pyx_v_s.data + __pyx_t_10 * __pyx_v_s.strides[0]) ))));
  }
+22:     return 4.0*hits/n
  __Pyx_XDECREF(__pyx_r);
  __pyx_t_5 = PyFloat_FromDouble(((4.0 * __pyx_v_hits) / __pyx_v_n)); if (unlikely(!__pyx_t_5)) {__pyx_filename = __pyx_f[0]; __pyx_lineno = 22; __pyx_clineno = __LINE__; goto __pyx_L1_error;}
  __Pyx_GOTREF(__pyx_t_5);
  __pyx_r = __pyx_t_5;
  __pyx_t_5 = 0;
  goto __pyx_L0;
n = int(1e5)
%timeit pi_python(n)
%timeit pi_numba(n)
%timeit pi_numpy(n)
%timeit pi_cython(n)
10 loops, best of 3: 127 ms per loop
1 loops, best of 3: 146 ms per loop
100 loops, best of 3: 5.18 ms per loop
100 loops, best of 3: 1.95 ms per loop

The bigger the problem, the more scope there is for parallelism

Amhdahls’ law says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the aloorithm. However, for big data analysis, Gustafson’s Law is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points.

Using Multiprocessing

import multiprocessing

num_procs = multiprocessing.cpu_count()
num_procs
4
def pi_multiprocessing(n):
    """Split a job of length n into num_procs pieces."""
    import multiprocessing
    m = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(m)
    results = pool.map(pi_cython, [n/m]*m)
    pool.close()
    return np.mean(results)

For small jobs, the cost of spawning processes dominates

n = int(1e5)
%timeit pi_cython(n)
%timeit pi_multiprocessing(n)
100 loops, best of 3: 1.95 ms per loop
10 loops, best of 3: 32.6 ms per loop

For larger jobs, we see the expected linear speedup

n = int(1e7)
%timeit pi_numpy(n)
%timeit pi_multiprocessing(n)
1 loops, best of 3: 718 ms per loop
10 loops, best of 3: 148 ms per loop

Not all tasks are embarassingly parallel. In these problems, we need to communicate across parallel workers. There are two ways to do this - via shared memory (exemplar is OpenMP) and by explicit communication mechanisms (exemplar is MPI). Multiprocessing (and GPU computing) can use both mechanisms.

See MOTW for examples of communicating across processes with multiprocessing.

Using shared memory can lead to race conditions

from multiprocessing import Pool, Value, Array, Lock, current_process

n = 4
val = Value('i')
arr = Array('i', n)

val.value = 0
for i in range(n):
    arr[i] = 0

def count1(i):
    "Everyone competes to write to val."""
    val.value += 1

def count2(i):
    """Each process has its own slot in arr to write to."""
    ix = current_process().pid % n
    arr[ix] += 1

pool = Pool(n)
pool.map(count1, range(1000))
pool.map(count2, range(1000))

pool.close()
print val.value
print sum(arr)
500
1000

Using IPython parallel for interactive parallel computing

Start a cluster of workers using IPython notebook interface. Alternatively, enter

ipcluster start -n 4

at the command line.

from IPython.parallel import Client, interactive

Direct view

rc = Client()
print rc.ids
dv = rc[:]
[0, 1, 2, 3]

When a cell is marked with %%px, all commands in that cell get executed on all engines simultaneously. We’ll use it to load numpy on all engines.

%px import numpy as np

We can refer to indivudal engines using indexing and slice notation on the client - for example, to set random seeds.

for i, r in enumerate(rc):
    r.execute('np.random.seed(123)')
%%px

np.random.random(3)
Out[0:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[1:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[2:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[3:2]: array([ 0.69646919,  0.28613933,  0.22685145])

Another way to do this is via the scatter operation.

dv.scatter('seed', [1,1,2,2], block=True)
dv['seed']
[[1], [1], [2], [2]]
%%px

np.random.seed(seed)
np.random.random(3)
Out[0:3]: array([ 0.13436424,  0.84743374,  0.76377462])
Out[1:3]: array([ 0.13436424,  0.84743374,  0.76377462])
Out[2:3]: array([ 0.95603427,  0.94782749,  0.05655137])
Out[3:3]: array([ 0.95603427,  0.94782749,  0.05655137])

We set them to differnet seeds again to do the Monte Carlo integration.

for i, r in enumerate(rc):
    r.execute('np.random.seed(%d)' % i)
%%px

np.random.random(3)
Out[0:4]: array([ 0.5488135 ,  0.71518937,  0.60276338])
Out[1:4]: array([  4.17022005e-01,   7.20324493e-01,   1.14374817e-04])
Out[2:4]: array([ 0.4359949 ,  0.02592623,  0.54966248])
Out[3:4]: array([ 0.5507979 ,  0.70814782,  0.29090474])

We can collect the individual results of remote computation using a dictionary lookup syntax or use gather to concatenate the resutls.

%%px

x = np.random.random(3)
dv['x']
[array([ 0.5449,  0.4237,  0.6459]),
 array([ 0.3023,  0.1468,  0.0923]),
 array([ 0.4353,  0.4204,  0.3303]),
 array([ 0.5108,  0.8929,  0.8963])]
dv.gather('x', block=True)
array([ 0.5449,  0.4237,  0.6459,  0.3023,  0.1468,  0.0923,  0.4353,
        0.4204,  0.3303,  0.5108,  0.8929,  0.8963])

Finding \(\pi\) simply involves generating random uniforms on each processor.

%%px
n = 1e7
x = np.random.uniform(-1, 1, (n, 2))
n = (x[:, 0]**2 + x[:,1]**2 < 1).sum()
%precision 8
ns = dv['n']
4*np.sum(ns)/(1e7*len(rc))
3.14143780

In blocking mode (the default), operations on remote engines do not return until all compuations are done. For long computations, this may be undesirable and we can ask the engine to return immeidately by using a non-blocking operation. In this case, what is returned is an Async type object, which we can query for whether the computation is complete and if so, retrieve data from it.

dv.scatter('s', np.arange(16), block=False)
<AsyncResult: scatter>
dv['s']
[array([0, 1, 2, 3]),
 array([4, 5, 6, 7]),
 array([ 8,  9, 10, 11]),
 array([12, 13, 14, 15])]
dv.gather('s')
<AsyncMapResult: gather>
dv.gather('s').get()
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15])
ar = dv.map_async(lambda x: x+1, range(10))
ar.ready()
False
ar.ready()
False
ar.get()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Sometimes the tasks are very unbalanced - some may complete in a short time, while others ay take a long time. In this case, using a load_balanced view is more efficient to avoid the risk that a single engine gets allocated all the long-running tasks.

lv = rc.load_balanced_view()
def wait(n):
    import time
    time.sleep(n)
    return n

dv['wait'] = wait
intervals = [5,1,1,1,1,1,1,1,1,1,1,1,1,5,5,5]
%%time

ar = dv.map(wait, intervals)
ar.get()
CPU times: user 2.75 s, sys: 723 ms, total: 3.47 s
Wall time: 16 s
%%time

ar = lv.map(wait, intervals, balanced=True)
ar.get()
CPU times: user 1.7 s, sys: 459 ms, total: 2.16 s
Wall time: 9.1 s

We need to %load_ext cythonmagic in every engine, and compile the cython extension in every engine. the simplest way is to do all this in a %%px cell.

%%px
def python_loop(xs):
    s = 0.0
    for i in range(len(xs)):
        s += xs[i]
    return s
%%px
%load_ext cythonmagic
%%px
%%cython

def cython_loop(double[::1] xs):
    n = xs.shape[0]
    cdef int i
    cdef double s = 0.0
    for i in range(n):
        s += xs[i]
    return s
%%time
%%px
xs = np.random.random(1e7)
s = python_loop(xs)
CPU times: user 900 ms, sys: 195 ms, total: 1.1 s
Wall time: 9.12 s
dv['s']
[4999255.51979800, 5001207.17286485, 5000816.40605527, 4999437.17107215]
%%time
%%px
xs = np.random.random(1e7)
s = cython_loop(xs)
CPU times: user 37.3 ms, sys: 7.5 ms, total: 44.8 ms
Wall time: 376 ms
dv['s']
[5000927.33063748, 4999180.32360687, 5000671.20938849, 4999140.47559244]

Other parallel programming approaches not covered

References

%load_ext version_information
%version_information numba, multiprocessing, cython
SoftwareVersion
Python2.7.9 64bit [GCC 4.2.1 (Apple Inc. build 5577)]
IPython2.2.0
OSDarwin 13.4.0 x86_64 i386 64bit
numba0.17.0
multiprocessing0.70a1
cython0.22
Thu Mar 26 16:49:30 2015 EDT