# Multi-Core Parallelism¶

```
In [1]:
```

```
%load_ext cython
```

```
In [48]:
```

```
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import os
```

```
In [3]:
```

```
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time
from numba import njit
```

## Vanilla Python¶

```
In [4]:
```

```
def mc_pi(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
```

```
In [5]:
```

```
%%time
res = [mc_pi(int(1e5)) for i in range(10)]
```

```
CPU times: user 2.12 s, sys: 4 ms, total: 2.13 s
Wall time: 2.14 s
```

## Using `numba`

to speed up computation¶

```
In [6]:
```

```
@njit()
def mc_pi_numba(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
```

```
In [7]:
```

```
%%time
res = [mc_pi_numba(int(1e7)) for i in range(10)]
```

```
CPU times: user 1.63 s, sys: 20 ms, total: 1.65 s
Wall time: 1.65 s
```

```
In [8]:
```

```
np.array(res)
```

```
Out[8]:
```

```
array([ 3.1408772, 3.1410464, 3.1416584, 3.141608 , 3.1412532,
3.141702 , 3.141554 , 3.1422448, 3.1413172, 3.1411456])
```

## Using `cython`

to speed up computation¶

```
In [9]:
```

```
%%cython
import cython
from libc.stdlib cimport rand
cdef extern from "limits.h":
int INT_MAX
@cython.cdivision(True)
def mc_pi_cython(int n):
cdef double s = 0.0
cdef double x, y
cdef int i
for i in range(n):
x = 2*(rand()/float(INT_MAX)) - 1
y = 2*(rand()/float(INT_MAX)) - 1
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
```

```
In [10]:
```

```
%%time
res = [mc_pi_cython(int(1e7)) for i in range(10)]
```

```
CPU times: user 4.2 s, sys: 16 ms, total: 4.22 s
Wall time: 4.21 s
```

```
In [11]:
```

```
np.array(res)
```

```
Out[11]:
```

```
array([ 3.1422556, 3.1427648, 3.142728 , 3.141858 , 3.1406612,
3.141878 , 3.1413508, 3.1413644, 3.1408592, 3.1417288])
```

## The `concurrent.futures`

module¶

Concurrent processes are processes that will return the same results
regardless of the order in which they were executed. A “future” is
something that will return a result sometime in the future. The
`concurrent.futures`

module provides an event handler, which can be
fed functions to be scheduled for future execution. This provides us
with a simple model for parallel execution on a multi-core machine.

While concurrent futures provide a simpler interface, it is slower and
less flexible when compared with using `multiprocessing`

for parallel
execution.

## Using processes in parallel with `ProcessPoolExecutor`

¶

We get a linear speedup as expected.

```
In [12]:
```

```
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
```

```
CPU times: user 16 ms, sys: 20 ms, total: 36 ms
Wall time: 1.5 s
```

```
In [13]:
```

```
np.array(list(res))
```

```
Out[13]:
```

```
array([ 3.1412484, 3.1412484, 3.1412484, 3.1412484, 3.1412768,
3.1412768, 3.1412768, 3.1412768, 3.1412796, 3.1412796])
```

### When you have many jobs¶

The `futures`

object gives fine control over the process, such as
adding callbacks and canceling a submitted job, but is computationally
expensive. We can use the `chunksize`

argument to reduce this cost
when submitting many jobs.

#### Using default chunksize of 1 for 10000 jobs¶

The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time.

```
In [14]:
```

```
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
```

```
CPU times: user 3.02 s, sys: 2.59 s, total: 5.61 s
Wall time: 4.2 s
```

#### Using chunksize of 100¶

```
In [15]:
```

```
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))], chunksize=100)
```

```
CPU times: user 48 ms, sys: 60 ms, total: 108 ms
Wall time: 1.32 s
```

### Fine control of processes¶

#### Status of processes¶

```
In [18]:
```

```
def f1(x):
return x**2
def f2(x, y):
return x*y
```

```
In [19]:
```

```
with ProcessPoolExecutor(max_workers=4) as pool:
a = pool.submit(f2, 1, 1)
b = pool.submit(f2, 1,2)
c = pool.submit(f1, 10)
print('a running:', a.running())
print('a done:', a.done())
print('b running:', b.running())
print('b done:', b.done())
print('c running:', c.running())
print('c done:', c.done())
print('a result', a.result())
print('b result', b.result())
print('c result', c.result())
```

```
a running: True
a done: False
b running: True
b done: False
c running: True
c done: False
a result 1
b result 2
c result 100
```

### Canceling jobs and adding callbacks¶

```
In [21]:
```

```
njobs = 24
res = []
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(njobs):
res.append(pool.submit(f2, *np.random.rand(2)))
if i % 2 == 0:
res[i].add_done_callback(lambda future: print("Process done!"))
res[4].cancel()
if res[4].cancelled():
print("Process 4 cancelled")
for i, x in enumerate(res):
while x.running():
print("Running")
time.sleep(1)
if not x.cancelled():
print(x.result())
```

```
Process done!
0.647142783744
0.0165398742026
Running
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
0.0678962539814
0.285092840593
0.00702848555339
0.147327841571
0.0804591350953
0.00422308306556
0.224027957196
0.219935389538
0.390023881571
0.00762518296509
0.174535563025
0.202128563571
0.134040028724
0.780790978879
0.0476424379436
0.946881166408
0.270935375167
0.0154612806871
0.0158441978589
0.0907343392355
0.0266080145638
0.0754518489776
```

### Functions with multiple arguments¶

```
In [22]:
```

```
def f(a, b):
return a + b
```

#### Using a function adapter¶

```
In [23]:
```

```
def f_(args):
return f(*args)
```

```
In [24]:
```

```
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)
```

```
In [25]:
```

```
chunks
```

```
Out[25]:
```

```
[array([0, 1]),
array([2, 3]),
array([4, 5]),
array([6, 7]),
array([8, 9]),
array([10, 11]),
array([12, 13]),
array([14, 15]),
array([16, 17]),
array([18, 19]),
array([20, 21]),
array([22, 23])]
```

```
In [26]:
```

```
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(f_, chunks)
list(res)
```

```
Out[26]:
```

```
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]
```

## Using processes in parallel with ThreadPoolExecutor¶

We do not get any speedup because the GIL only allows one thread to run at one time.

```
In [27]:
```

```
%%time
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
```

```
CPU times: user 4.34 s, sys: 32 ms, total: 4.38 s
Wall time: 4.32 s
```

```
In [28]:
```

```
np.array(list(res))
```

```
Out[28]:
```

```
array([ 3.1412484, 3.1412768, 3.1412796, 3.1421904, 3.1422264,
3.1411704, 3.1415984, 3.1423524, 3.1411896, 3.1417464])
```

## Turning off the GIL in `cython`

¶

```
In [33]:
```

```
%%cython
import cython
from libc.stdlib cimport rand
cdef extern from "limits.h":
int INT_MAX
@cython.cdivision(True)
def mc_pi_cython_nogil(int n):
cdef double s = 0.0
cdef double x, y
cdef int i
with cython.nogil:
for i in range(n):
x = 2*(rand()/float(INT_MAX)) - 1
y = 2*(rand()/float(INT_MAX)) - 1
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
```

```
In [34]:
```

```
%%time
res = [mc_pi_cython_nogil(int(1e7)) for i in range(10)]
```

```
CPU times: user 4.08 s, sys: 0 ns, total: 4.08 s
Wall time: 4.08 s
```

```
In [35]:
```

```
np.array(res)
```

```
Out[35]:
```

```
array([ 3.141222 , 3.1419168, 3.1416088, 3.1414532, 3.142072 ,
3.141182 , 3.142152 , 3.1412056, 3.142742 , 3.141344 ])
```

## Using processes in parallel with `ThreadPoolExecutor`

and `nogil`

¶

We finally get the linear speedup expected. Note that threads are actually faster than processes because there is less overhead to using a thread.

```
In [36]:
```

```
%%time
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi_cython_nogil, [int(1e7) for i in range(10)])
```

```
CPU times: user 9.65 s, sys: 35.2 s, total: 44.8 s
Wall time: 13.4 s
```

```
In [37]:
```

```
np.array(list(res))
```

```
Out[37]:
```

```
array([ 3.1419416, 3.1409748, 3.141634 , 3.1413224, 3.1419116,
3.1409356, 3.14161 , 3.1404276, 3.142516 , 3.1415676])
```

## Using `multiprocessing`

¶

One nice thing about using `multiprocessing`

is that it works equally
well for small numbers of large jobs, or large numbers of small jobs out
of the box.

```
In [41]:
```

```
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
```

```
CPU times: user 8 ms, sys: 32 ms, total: 40 ms
Wall time: 1.44 s
```

```
In [42]:
```

```
np.array(res)
```

```
Out[42]:
```

```
array([ 3.1412268, 3.1412268, 3.1412268, 3.1412268, 3.1400984,
3.1400984, 3.1400984, 3.1400984, 3.1408268, 3.1408268])
```

```
In [43]:
```

```
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
```

```
CPU times: user 16 ms, sys: 28 ms, total: 44 ms
Wall time: 1.24 s
```

```
In [45]:
```

```
np.array(res)
```

```
Out[45]:
```

```
array([ 3.1288, 3.1468, 3.1356, ..., 3.1228, 3.1272, 3.1036])
```

### Creating individual processes¶

```
In [46]:
```

```
def f(i):
time.sleep(np.random.random())
print(os.getpid(), i)
```

```
In [49]:
```

```
for i in range(10):
p = mp.Process(target=f, args=(i,))
p.start()
p.join()
```

```
2248 0
2251 1
2254 2
2257 3
2260 4
2263 5
2266 6
2269 7
2272 8
2275 9
```

### Functions with multiple arguments¶

Multiprocessing `Pool`

has a `starmap`

method that removes the need
to write a wrapper function.

```
In [50]:
```

```
def f(a, b):
return a + b
```

```
In [51]:
```

```
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)
```

```
Out[51]:
```

```
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]
```

#### Partial application¶

Sometimes, `functools.partial`

can be used to reduce the number of
arguments needed to just one.

```
In [52]:
```

```
def f(a, b):
return a * b
```

```
In [53]:
```

```
from functools import partial
fp = partial(f, b=2)
```

```
In [54]:
```

```
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.map(fp, xs)
np.array(list(res))
```

```
Out[54]:
```

```
array([ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 42, 44, 46])
```

#### How do we get a return value from a process?¶

```
In [55]:
```

```
def f1(q, i):
time.sleep(np.random.random())
q.put((os.getpid(), i))
```

```
In [56]:
```

```
q = mp.Queue()
res = []
for i in range(10):
p = mp.Process(target=f1, args=(q,i,))
p.start()
res.append(q.get())
p.join()
res
```

```
Out[56]:
```

```
[(2292, 0),
(2294, 1),
(2296, 2),
(2298, 3),
(2300, 4),
(2302, 5),
(2304, 6),
(2306, 7),
(2308, 8),
(2310, 9)]
```

#### Counting number of jobs (1)¶

```
In [57]:
```

```
def f2(i):
global counter
counter = counter + 1
print(os.getpid(), i)
```

#### Checking¶

```
In [58]:
```

```
counter = 0
f2(10)
print(counter)
```

```
1997 10
1
```

```
In [59]:
```

```
counter = 0
for i in range(10):
p = mp.Process(target=f2, args=(i,))
p.start()
p.join()
```

```
2312 0
2315 1
2318 2
2321 3
2324 4
2327 5
2330 6
2333 7
2336 8
2339 9
```

#### Counting number of jobs (2)¶

We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).

```
In [61]:
```

```
def f3(i, counter, store):
counter.value += 1
store[os.getpid() % 10] += i
```

```
In [62]:
```

```
%%time
counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)
for i in range(int(1e2)):
p = mp.Process(target=f3, args=(i, counter, store))
p.start()
p.join()
print(counter.value)
print(store[:])
```

```
100
[530, 540, 450, 460, 470, 480, 490, 500, 510, 520]
CPU times: user 144 ms, sys: 316 ms, total: 460 ms
Wall time: 792 ms
```

#### Counting number of jobs (3)¶

We should try to avoid using shared memory as much as possible in
parallel jobs as they drastically reduce efficiency. One useful approach
is to use the `map-reduce`

pattern. We should also use Pool to reuse
processes rather than spawn too many of them. We will see much more of
the `map-reduc`

approach when we work with Spark.

```
In [63]:
```

```
def f4(i):
return (os.getpid(), 1, i)
```

```
In [64]:
```

```
%%time
# map step
with mp.Pool(processes=10) as pool:
res = pool.map(f4, range(int(1e2)))
#reeduce step
res = np.array(res)
counter = res[:, 1].sum()
print(counter)
store = np.zeros(10)
idx = res[:, 0] % 10
for i in range(10):
store[i] = res[idx==i, 2].sum()
print(store)
```

```
100
[ 0. 0. 1101. 369. 699. 237. 894. 723. 369. 558.]
CPU times: user 24 ms, sys: 44 ms, total: 68 ms
Wall time: 179 ms
```