Using ipyparallel
¶
Parallel execution is tightly integrated with Jupyter in the
ipyparallel
package. Install with
conda install ipyparallel
ipcluster nbextension enable
Starting engines¶
We will only use engines on local cores which does not require any setup - see docs for detailed instructions on how to set up a remote cluster, including setting up to use Amazon EC2 clusters.
You can start a cluster on the IPython Clusters
tab in the main
Jupyter browser window or via the command line with
ipcluster start -n <put desired number of engines to run here>
The main advantage of developing parallel applications using
ipyparallel
is that it can be done interactively within Jupyter.
Basic concepts of ipyparallel
¶
In [1]:
from ipyparallel import Client
The client connects to the cluster of “remote” engines that perfrom the actual computation. These engines may be on the same machine or on a cluster.
In [2]:
rc = Client()
In [3]:
rc.ids
Out[3]:
[0, 1, 2, 3]
A view provides access to a subset of the engines available to the
client. Jobs are submitted to the engines via the view. A direct view
allows the user to explicitly send work specific engines. The load
balanced view is like the Pool
object in multiprocessing
, and
manages the scheduling and distribution of jobs for you.
Direct view
In [4]:
dv = rc[:]
Add 10 sets of 3 numbers in parallel using all engines.
In [5]:
dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[5]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Add 10 sets of 3 numbers in parallel using only alternate engines.
In [6]:
rc[::2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[6]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Add 10 sets of 3 numbers using a specific engine.
In [7]:
rc[2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[7]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Load balanced view
Use this when you have many jobs that take differnet amounts of time to complete.
In [75]:
lv = rc.load_balanced_view()
In [80]:
lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))
Out[80]:
[50261.04884692176,
49966.438133877964,
49825.131766711958,
50131.890397676114,
49939.572135256865,
50162.518589135783,
50065.751713594087,
49922.903432015002,
49983.505820534752,
49942.245237953692]
Calling functions with apply¶
In contrast to map
, apply
is just a simple function call run on
all remote engines, and has the usual function signature
apply(f, *args, **kwargs)
. It is a primitive on which other more
useful functions (such as map
) are built upon.
In [8]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)
Out[8]:
[25, 25]
In [9]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, x=3, y=4)
Out[9]:
[25, 25]
Synchronous and asynchronous jobs¶
We have used the map_sync
and apply_sync
methods. The sync
suffix indicate that we want to run a synchronous job. Synchronous jobs
block
until all the computation is done and return the result.
In [10]:
res = dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [11]:
res
Out[11]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
In contrast, asynchronous jobs return immediately so that you can do
other work, but returns a AsyncMapResult
object, similar to the
future
object returned by the concurrent.futures
package. You
can query its status, cancel running jobs and retrieve results once they
have been computed.
In [12]:
res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [13]:
res
Out[13]:
<AsyncMapResult: <lambda>>
In [14]:
res.done()
Out[14]:
True
In [15]:
res.get()
Out[15]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
There is also a map
method that by defulat uses asynchronous mode,
but you can change this by setting the blcok
attribute or function
argument.
In [16]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [17]:
res.get()
Out[17]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Change blocking mode for just one job.
In [18]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10), block=True)
In [19]:
res
Out[19]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Change blocking mode for this view so that all jobs are synchronous.
In [20]:
dv.block = True
In [21]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [22]:
res
Out[22]:
[0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
Remote function decorators¶
The @remote
decorator results in functions that will execute
simultaneously on all engines in a view. For example, you can use this
decorator if you always want to run \(n\) independent parallel MCMC
chains.
In [23]:
@dv.remote(block = True)
def f1(n):
return np.random.rand(n)
In [24]:
f1(4)
Out[24]:
[array([ 0.14231309, 0.85848692, 0.49302051, 0.87264134]),
array([ 0.80395014, 0.74717739, 0.23667959, 0.94655422]),
array([ 0.72474235, 0.23430899, 0.55128161, 0.40699965]),
array([ 0.47586201, 0.11789862, 0.63038679, 0.63673176])]
The @parallel decorator breaks up elementwise operations and distributes them.
In [25]:
@dv.parallel(block = True)
def f2(x):
return x
In [26]:
f2(range(15))
Out[26]:
[range(0, 4), range(4, 8), range(8, 12), range(12, 15)]
In [27]:
@dv.parallel(block = True)
def f3(x):
return sum(x)
In [28]:
f3(range(15))
Out[28]:
[6, 22, 38, 39]
In [29]:
@dv.parallel(block = True)
def f4(x, y):
return x + y
In [30]:
f4(np.arange(10), np.arange(10))
Out[30]:
array([ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18])
Example: Use the @parallel
decorator to speed up Mandelbrot calculations¶
In [31]:
def mandel1(x, y, max_iters=80):
c = complex(x, y)
z = 0.0j
for i in range(max_iters):
z = z*z + c
if z.real*z.real + z.imag*z.imag >= 4:
return i
return max_iters
In [32]:
@dv.parallel(block = True)
def mandel2(x, y, max_iters=80):
c = complex(x, y)
z = 0.0j
for i in range(max_iters):
z = z*z + c
if z.real*z.real + z.imag*z.imag >= 4:
return i
return max_iters
In [33]:
x = np.arange(-2, 1, 0.01)
y = np.arange(-1, 1, 0.01)
X, Y = np.meshgrid(x, y)
In [34]:
%%time
im1 = np.reshape(list(map(mandel1, X.ravel(), Y.ravel())), (len(y), len(x)))
CPU times: user 1.02 s, sys: 5.2 ms, total: 1.02 s
Wall time: 1.02 s
In [35]:
%%time
im2 = np.reshape(mandel2.map(X.ravel(), Y.ravel()), (len(y), len(x)))
CPU times: user 28.2 ms, sys: 8.84 ms, total: 37 ms
Wall time: 408 ms
In [36]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].grid(False)
axes[0].imshow(im1, cmap='jet')
axes[1].grid(False)
axes[1].imshow(im2, cmap='jet')
pass
Functions with dependencies¶
Modules imported locally are NOT available in the remote engines.
In [37]:
import time
import datetime
In [38]:
def g1(x):
time.sleep(0.1)
now = datetime.datetime.now()
return (now, x)
This fails with an Exception because the time
and datetime
modules are not imported in the remote engines.
dv.map_sync(g1, range(10))
The simplest fix is to import the module(s) within the function
In [39]:
def g2(x):
import time, datetime
time.sleep(0.1)
now = datetime.datetime.now()
return (now, x)
In [40]:
dv.map_sync(g2, range(5))
Out[40]:
[(datetime.datetime(2016, 4, 9, 11, 54, 40, 389541), 0),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 493559), 1),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 388099), 2),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 389526), 3),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 389520), 4)]
Alternatively, you can simultaneously import both locally and in the
remote engines with the sycn_import
context manager.
In [41]:
with dv.sync_imports():
import time
import datetime
importing time on engine(s)
importing datetime on engine(s)
Now the g1
function will work.
In [42]:
dv.map_sync(g1, range(5))
Out[42]:
[(datetime.datetime(2016, 4, 9, 11, 54, 40, 645512), 0),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 749825), 1),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 641631), 2),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 641632), 3),
(datetime.datetime(2016, 4, 9, 11, 54, 40, 642624), 4)]
Finally, there is also a require
decorator that can be used. This
will force the remote engine to import all packages given.
In [43]:
from ipyparallel import require
In [44]:
@require('scipy.stats')
def g3(x):
return scipy.stats.norm(0,1).pdf(x)
In [45]:
dv.map(g3, np.arange(-3, 4))
Out[45]:
[0.0044318484119380075,
0.053990966513188063,
0.24197072451914337,
0.3989422804014327,
0.24197072451914337,
0.053990966513188063,
0.0044318484119380075]
Moving data around¶
We can send data to remote engines with push
and retrieve them with
pull
, or using the dictionary interface. For example, you can use
this to distribute a large lookup table to all engines once instead of
repeatedly as a function argument.
In [46]:
dv.push(dict(a=3, b=2))
Out[46]:
[None, None, None, None]
In [47]:
def f(x):
global a, b
return a*x + b
In [48]:
dv.map_sync(f, range(5))
Out[48]:
[2, 5, 8, 11, 14]
In [49]:
dv.pull(('a', 'b'))
Out[49]:
[[3, 2], [3, 2], [3, 2], [3, 2]]
You can also use the dictionary interface as an alternative to push and pull¶
In [50]:
dv['c'] = 5
In [51]:
dv['a']
Out[51]:
[3, 3, 3, 3]
In [52]:
dv['c']
Out[52]:
[5, 5, 5, 5]
Working with compiled code¶
Using numba.jit
is straightforward.
In [53]:
with dv.sync_imports():
import numba
importing numba on engine(s)
In [54]:
@numba.jit
def f_numba(x):
return np.sum(x)
In [55]:
dv.map(f_numba, np.random.random((6, 4)))
Out[55]:
[1.4260310583057128,
2.743483501632218,
3.009397757283332,
1.145951696998727,
1.7508418471631069,
2.75037418092905]
We need to do some extra work to make sure the shared libary compiled with cython is available to the remote engines:
- Compile a
named
shared module with the-n
flag - Use
np.ndarray[dtype, ndim]
in place of memroy views- for example, double[:] becomes np.ndarray[np.float64_t, ndim=1]
- Move the shared library to the
site-packages
directory- Cython magic moules can be found in
~/.ipython/cython
- Cython magic moules can be found in
- Import the modules remtoely in the usual ways
In [56]:
%load_ext cython
In [57]:
%%cython -n cylib
import cython
import numpy as np
cimport numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def f(np.ndarray[np.float64_t, ndim=1] x):
x.setflags(write=True)
cdef int i
cdef int n = x.shape[0]
cdef double s = 0
for i in range(n):
s += x[i]
return s
Copy the compiled module in site-packages
so that the remote engines can import it¶
In [58]:
import site
import shutil
src = glob.glob(os.path.join(os.path.expanduser('~/'), '.ipython', 'cython', 'cylib*so'))[0]
dst = site.getsitepackages()[0]
shutil.copy(src, dst)
Out[58]:
'/Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages/cylib.cpython-35m-darwin.so'
In [59]:
with dv.sync_imports():
import cylib
importing cylib on engine(s)
Using parallel magic commands¶
In practice, most users will simply use the %px
magic to execute
code in parallel from within the notebook. This is the simplest way to
use ipyparallel
.
In [60]:
dv.map(cylib.f, np.random.random((6, 4)))
Out[60]:
[2.017269882929705,
2.0551330836787485,
1.7354664085414502,
3.2959252645109527,
1.6032104169868933,
0.265696865060396]
This sends the command to all targeted engines.
In [61]:
%px a = np.random.random(4)
%px a.sum()
[0;31mOut[0:2]: [0m2.9214397362530504
[0;31mOut[1:2]: [0m1.2663235592392268
[0;31mOut[2:2]: [0m1.6476882945611011
[0;31mOut[3:2]: [0m2.5324941555165084
List comprehensions in parallel¶
The scatter
method partitions and distributes data to all engines.
The gather
method does the reverse. Together with %px
, we can
simulate parallel list comprehensions.
In [72]:
dv.scatter('a', np.random.randint(0, 10, 10))
%px print(a)
[stdout:0] [5 9 2]
[stdout:1] [5 9 5]
[stdout:2] [2 7]
[stdout:3] [6 4]
In [73]:
dv.gather('a')
Out[73]:
array([5, 9, 2, 5, 9, 5, 2, 7, 6, 4])
In [74]:
dv.scatter('xs', range(24))
%px y = [x**2 for x in xs]
np.array(dv.gather('y'))
Out[74]:
array([ 0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144,
169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529])
Running magic functions in parallel¶
In [63]:
%%px --target [1,3]
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
[output:1]
[output:3]
Running in non-blcoking mode¶
In [64]:
%%px --target [1,3] --noblock
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
Out[64]:
<AsyncResult: execute>
In [65]:
%pxresult
[output:1]
[output:3]
Interacting with individual enggines¶
We can interact with individual engines by calling the %qtconsole on each engine.
In [66]:
%px %qtconsole
Try looking at some of the variables defined in earlier cells - e..g
a
, x
, y
etc. Change the values directly in the console, then
pull back into the notebook interface. Are the changes made preserved?
In [ ]: