Using ipyparallel

Parallel execution is tightly integrated with Jupyter in the ipyparallel package. Install with

conda install ipyparallel
ipcluster nbextension enable

Official documentation

In [1]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

Starting engines

We will only use engines on local cores which does not require any setup - see docs for detailed instructions on how to set up a remote cluster, including setting up to use Amazon EC2 clusters.

You can start a cluster on the IPython Clusters tab in the main Jupyter browser window or via the command line with

ipcluster start -n <put desired number of engines to run here>

The main advantage of developing parallel applications using ipyparallel is that it can be done interactively within Jupyter.

Basic concepts of ipyparallel

In [2]:
from ipyparallel import Client

The client connects to the cluster of “remote” engines that perfrom the actual computation. These engines may be on the same machine or on a cluster.

In [3]:
rc = Client()
In [4]:
rc.ids
Out[4]:
[0, 1, 2, 3, 4, 5, 6, 7]

A view provides access to a subset of the engines available to the client. Jobs are submitted to the engines via the view. A direct view allows the user to explicitly send work specific engines. The load balanced view is like the Pool object in multiprocessing, and manages the scheduling and distribution of jobs for you.

Direct view

In [5]:
dv = rc[:]

Add 10 sets of 3 numbers in parallel using all engines.

In [6]:
dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[6]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Add 10 sets of 3 numbers in parallel using only alternate engines.

In [7]:
rc[::2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[7]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Add 10 sets of 3 numbers using a specific engine.

In [8]:
rc[2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[8]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Load balanced view

Use this when you have many jobs that take different amounts of time to complete.

In [9]:
lv = rc.load_balanced_view()
In [10]:
lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))
Out[10]:
[49943.055545684554,
 49958.361259726298,
 49864.353608090729,
 50138.070719165735,
 49948.175168188893,
 49988.083010381451,
 50019.95202007662,
 49935.980196889475,
 50126.055843775182,
 50015.903549302966]

Calling functions with apply

In contrast to map, apply is just a simple function call run on all remote engines, and has the usual function signature apply(f, *args, **kwargs). It is a primitive on which other more useful functions (such as map) are built upon.

In [11]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)
Out[11]:
[25, 25]
In [12]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, x=3, y=4)
Out[12]:
[25, 25]

Synchronous and asynchronous jobs

We have used the map_sync and apply_sync methods. The sync suffix indicate that we want to run a synchronous job. Synchronous jobs block until all the computation is done and return the result.

In [13]:
res = dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [14]:
res
Out[14]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

In contrast, asynchronous jobs return immediately so that you can do other work, but returns a AsyncMapResult object, similar to the future object returned by the concurrent.futures package. You can query its status, cancel running jobs and retrieve results once they have been computed.

In [15]:
res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [16]:
res
Out[16]:
<AsyncMapResult: <lambda>>
In [17]:
res.done()
Out[17]:
True
In [18]:
res.get()
Out[18]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

There is also a map method that by default uses asynchronous mode, but you can change this by setting the block attribute or function argument.

In [19]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [20]:
res.get()
Out[20]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Change blocking mode for just one job.

In [21]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10), block=True)
In [22]:
res
Out[22]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Change blocking mode for this view so that all jobs are synchronous.

In [23]:
dv.block = True
In [24]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [25]:
res
Out[25]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]

Remote function decorators

The @remote decorator results in functions that will execute simultaneously on all engines in a view. For example, you can use this decorator if you always want to run \(n\) independent parallel MCMC chains.

In [26]:
with dv.sync_imports():
    import numpy
importing numpy on engine(s)
In [27]:
@dv.remote(block = True)
def f1(n):
    return numpy.random.rand(n)
In [28]:
f1(4)
Out[28]:
[array([ 0.33832753,  0.23342134,  0.09817073,  0.51060264]),
 array([ 0.24919434,  0.61226489,  0.1709979 ,  0.92716651]),
 array([ 0.19053364,  0.35857299,  0.09015628,  0.32347737]),
 array([ 0.7697255 ,  0.57345448,  0.10060588,  0.3875056 ]),
 array([ 0.89363384,  0.70684507,  0.27046639,  0.12688668]),
 array([ 0.75036068,  0.2489435 ,  0.3555557 ,  0.52555086]),
 array([ 0.94484292,  0.12537997,  0.94156442,  0.10110875]),
 array([ 0.49883145,  0.76342079,  0.51813558,  0.54732924])]

The @parallel decorator breaks up elementwise operations and distributes them.

In [29]:
@dv.parallel(block = True)
def f2(x):
    return x
In [30]:
f2(range(15))
Out[30]:
[range(0, 2),
 range(2, 4),
 range(4, 6),
 range(6, 8),
 range(8, 10),
 range(10, 12),
 range(12, 14),
 range(14, 15)]
In [31]:
@dv.parallel(block = True)
def f3(x):
    return sum(x)
In [32]:
f3(range(15))
Out[32]:
[1, 5, 9, 13, 17, 21, 25, 14]
In [33]:
@dv.parallel(block = True)
def f4(x, y):
    return x + y
In [34]:
f4(np.arange(10), np.arange(10))
Out[34]:
array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18])

Example: Use the @parallel decorator to speed up Mandelbrot calculations

In [35]:
def mandel1(x, y, max_iters=80):
    c = complex(x, y)
    z = 0.0j
    for i in range(max_iters):
        z = z*z + c
        if z.real*z.real + z.imag*z.imag >= 4:
            return i
    return max_iters
In [36]:
@dv.parallel(block = True)
def mandel2(x, y, max_iters=80):
    c = complex(x, y)
    z = 0.0j
    for i in range(max_iters):
        z = z*z + c
        if z.real*z.real + z.imag*z.imag >= 4:
            return i
    return max_iters
In [37]:
x = np.arange(-2, 1, 0.01)
y = np.arange(-1, 1, 0.01)
X, Y = np.meshgrid(x, y)
In [38]:
%%time
im1 = np.reshape(list(map(mandel1, X.ravel(), Y.ravel())), (len(y), len(x)))
CPU times: user 536 ms, sys: 4 ms, total: 540 ms
Wall time: 547 ms
In [39]:
%%time
im2 = np.reshape(mandel2.map(X.ravel(), Y.ravel()),  (len(y), len(x)))
CPU times: user 40 ms, sys: 0 ns, total: 40 ms
Wall time: 156 ms
In [40]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].grid(False)
axes[0].imshow(im1, cmap='jet')
axes[1].grid(False)
axes[1].imshow(im2, cmap='jet')
pass
../_images/notebooks_S14D_IPyParallel_64_0.png

Functions with dependencies

Modules imported locally are NOT available in the remote engines.

In [41]:
import time
import datetime
In [42]:
def g1(x):
    time.sleep(0.1)
    now = datetime.datetime.now()
    return (now, x)

This fails with an Exception because the time and datetime modules are not imported in the remote engines.

dv.map_sync(g1, range(10))

The simplest fix is to import the module(s) within the function

In [43]:
def g2(x):
    import time, datetime
    time.sleep(0.1)
    now = datetime.datetime.now()
    return (now, x)
In [44]:
dv.map_sync(g2, range(5))
Out[44]:
[(datetime.datetime(2018, 4, 3, 20, 33, 10, 207910), 0),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 209498), 1),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 209653), 2),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 209659), 3),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 209884), 4)]

Alternatively, you can simultaneously import both locally and in the remote engines with the sync_import context manager.

In [45]:
with dv.sync_imports():
    import time
    import datetime
importing time on engine(s)
importing datetime on engine(s)

Now the g1 function will work.

In [46]:
dv.map_sync(g1, range(5))
Out[46]:
[(datetime.datetime(2018, 4, 3, 20, 33, 10, 396045), 0),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 395993), 1),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 396044), 2),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 397792), 3),
 (datetime.datetime(2018, 4, 3, 20, 33, 10, 398626), 4)]

Finally, there is also a require decorator that can be used. This will force the remote engine to import all packages given.

In [47]:
from ipyparallel import require
In [48]:
@require('scipy.stats')
def g3(x):
    return scipy.stats.norm(0,1).pdf(x)
In [49]:
dv.map(g3, np.arange(-3, 4))
Out[49]:
[0.0044318484119380075,
 0.053990966513188063,
 0.24197072451914337,
 0.3989422804014327,
 0.24197072451914337,
 0.053990966513188063,
 0.0044318484119380075]

Moving data around

We can send data to remote engines with push and retrieve them with pull, or using the dictionary interface. For example, you can use this to distribute a large lookup table to all engines once instead of repeatedly as a function argument.

In [50]:
dv.push(dict(a=3, b=2))
Out[50]:
[None, None, None, None, None, None, None, None]
In [51]:
def f(x):
    global a, b
    return a*x + b
In [52]:
dv.map_sync(f, range(5))
Out[52]:
[2, 5, 8, 11, 14]
In [53]:
dv.pull(('a', 'b'))
Out[53]:
[[3, 2], [3, 2], [3, 2], [3, 2], [3, 2], [3, 2], [3, 2], [3, 2]]

You can also use the dictionary interface as an alternative to push and pull

In [54]:
dv['c'] = 5
In [55]:
dv['a']
Out[55]:
[3, 3, 3, 3, 3, 3, 3, 3]
In [56]:
dv['c']
Out[56]:
[5, 5, 5, 5, 5, 5, 5, 5]

Working with compiled code

Using numba.jit is straightforward. See example

We need to do some extra work to make sure the shared libary compiled with cython is available to the remote engines:

  • Compile a named shared module with the -n flag
  • Use np.ndarray[dtype, ndim] in place of memroy views
    • for example, double[:] becomes np.ndarray[np.float64_t, ndim=1]
  • Move the shared library to the site-packages directory
    • Cython magic moules can be found in ~/.cache/ipython/cython
  • Import the modules remtoely in the usual ways
In [57]:
%load_ext cython
In [58]:
%%cython -n cylib

import cython
import numpy as np
cimport numpy as np

@cython.boundscheck(False)
@cython.wraparound(False)
def f(np.ndarray[np.float64_t, ndim=1] x):
    x.setflags(write=True)
    cdef int i
    cdef int n = x.shape[0]
    cdef double s = 0

    for i in range(n):
        s += x[i]
    return s

Copy the compiled module in site-packages so that the remote engines can import it

In [59]:
import os
import glob
import site
import shutil
src = glob.glob(os.path.join(os.path.expanduser('~/'), '.cache', 'ipython', 'cython', 'cylib*so'))[0]
dst = site.getsitepackages()[0]
shutil.copy(src, dst)
Out[59]:
'/opt/conda/lib/python3.6/site-packages/cylib.cpython-36m-x86_64-linux-gnu.so'
In [60]:
with dv.sync_imports():
    import cylib
importing cylib on engine(s)

Using parallel magic commands

In practice, most users will simply use the %px magic to execute code in parallel from within the notebook. This is the simplest way to use ipyparallel.

In [61]:
dv.map(cylib.f, np.random.random((6, 4)))
Out[61]:
[1.8825694893878855,
 1.3291781855139693,
 1.5788796473521334,
 2.8365779834827047,
 1.7491894709462499,
 0.8538153083687106]

This sends the command to all targeted engines.

In [62]:
%px import numpy as np
%px a = np.random.random(4)
%px a.sum()
Out[0:3]: 1.4108615162007481
Out[1:3]: 2.307313190289948
Out[2:3]: 2.7734346738281523
Out[3:3]: 1.2018443672156747
Out[4:3]: 1.4850212778476148
Out[5:3]: 1.7126681481007608
Out[6:3]: 2.1182406999813619
Out[7:3]: 1.5880181901926447

List comprehensions in parallel

The scatter method partitions and distributes data to all engines. The gather method does the reverse. Together with %px, we can simulate parallel list comprehensions.

In [63]:
dv.scatter('a', np.random.randint(0, 10, 10))
%px print(a)
[stdout:0] [0 9]
[stdout:1] [6 9]
[stdout:2] [9]
[stdout:3] [5]
[stdout:4] [7]
[stdout:5] [9]
[stdout:6] [1]
[stdout:7] [5]
In [64]:
dv.gather('a')
Out[64]:
array([0, 9, 6, 9, 9, 5, 7, 9, 1, 5])
In [65]:
dv.scatter('xs', range(24))
%px y = [x**2 for x in xs]
np.array(dv.gather('y'))
Out[65]:
array([  0,   1,   4,   9,  16,  25,  36,  49,  64,  81, 100, 121, 144,
       169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529])

Running magic functions in parallel

In [66]:
%%px --target [1,3]
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
[output:1]
../_images/notebooks_S14D_IPyParallel_112_1.png
[output:3]
../_images/notebooks_S14D_IPyParallel_112_3.png

Running in non-blocking mode

In [67]:
%%px --target [1,3] --noblock
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
Out[67]:
<AsyncResult: execute>
In [68]:
%pxresult
[output:1]
../_images/notebooks_S14D_IPyParallel_115_1.png
[output:3]
../_images/notebooks_S14D_IPyParallel_115_3.png

Interacting with individual engines

We can interact with individual engines by calling the %qtconsole on each engine.

In [69]:
%px %qtconsole

Try looking at some of the variables defined in earlier cells - e..g a, x, y etc. Change the values directly in the console, then pull back into the notebook interface. Are the changes made preserved?