Multi-Core Parallelism¶
[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import os
[2]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time
Vanilla Python¶
Toy problem: Estimate \(\pi\) by sampling points at random within a box circumscribing the unit circle and counting the fraction that fall within the circle. This is a simple example of a Monte Carlo algorithm. We will parallelize this embarrassingly parallel problem.
[3]:
def mc_pi(n):
s = 0
for i in range(n):
x = np.random.uniform(-1, 1)
y = np.random.uniform(-1, 1)
if (x**2 + y**2) < 1:
s += 1
return 4*s/n
[4]:
%%time
mc_pi(int(1e5))
CPU times: user 624 ms, sys: 4 ms, total: 628 ms
Wall time: 626 ms
[4]:
3.14564
[5]:
%%time
res = [mc_pi(int(1e5)) for i in range(10)]
CPU times: user 5.94 s, sys: 0 ns, total: 5.94 s
Wall time: 5.93 s
The concurrent.futures
module¶
Concurrent processes are processes that will return the same results regardless of the order in which they were executed. A “future” is something that will return a result sometime in the future. The concurrent.futures
module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.
Using processes in parallel with ProcessPoolExecutor
¶
We get a linear speedup as expected.
[6]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 32 ms, sys: 20 ms, total: 52 ms
Wall time: 1.86 s
[7]:
np.array(list(res))
[7]:
array([3.13784, 3.13784, 3.13784, 3.13784, 3.13968, 3.13968, 3.13968,
3.13968, 3.14136, 3.14136])
When you have many jobs¶
The futures
object gives fine control over the process, such as adding callbacks and canceling a submitted job, but is computationally expensive. We can use the chunksize
argument to reduce this cost when submitting many jobs - this specifies the number of tasks to be given to a worker at a time. A detailed explanation of chunksize
is provided here
Using default chunksize
¶
The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time, but this is not true due to the overhead described above.
[8]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])
CPU times: user 3.26 s, sys: 1.81 s, total: 5.07 s
Wall time: 3.19 s
Using chunksize
of 100¶
[9]:
%%time
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))], chunksize=100)
CPU times: user 56 ms, sys: 52 ms, total: 108 ms
Wall time: 1.5 s
Fine control of processes¶
Status of processes¶
[10]:
def f1(x):
return x**2
def f2(x, y):
return x*y
[11]:
with ProcessPoolExecutor(max_workers=4) as pool:
a = pool.submit(f2, 1, 1)
b = pool.submit(f2, 1,2)
c = pool.submit(f1, 10)
print('a running:', a.running())
print('a done:', a.done())
print('b running:', b.running())
print('b done:', b.done())
print('c running:', c.running())
print('c done:', c.done())
print('a result', a.result())
print('b result', b.result())
print('c result', c.result())
a running: True
a done: True
b running: False
b done: True
c running: False
c done: True
a result 1
b result 2
c result 100
Canceling jobs and adding callbacks¶
For example, if you launch multiple versions of the same task for safety, you might want to cancel the duplicate tasks once one of them has completed.
Callbacks are to allow the function to notify you when some event occurs.
[12]:
njobs = 24
res = []
with ProcessPoolExecutor(max_workers=4) as pool:
for i in range(njobs):
res.append(pool.submit(f2, *np.random.rand(2)))
if i % 2 == 0:
res[i].add_done_callback(lambda future: print("Process done!"))
res[4].cancel()
if res[4].cancelled():
print("Process 4 cancelled")
for i, x in enumerate(res):
while x.running():
print("Running")
time.sleep(1)
if not x.cancelled():
print(x.result())
Process done!
Process 4 cancelled
Running
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
0.6722884468935053
0.545729761075415
0.010586909677611098
0.3381938160419955
0.4116097214015501
0.18973574383421796
0.7778720057180574
0.10077814728759954
0.06055768121630078
0.3020277653820008
0.06874279853124576
0.17851026444067972
0.08987112970497925
0.07174658531419248
0.07864945674072424
0.13488073927114674
0.10792043155043497
0.7681153304311592
0.4552605247520855
0.7565851958126836
0.3062866391681099
0.21432713825666197
0.4555355058023458
Functions with multiple arguments¶
Using a pool and the map
method with functions requiring multiple arguments can be done in two ways.
[13]:
def f(a, b):
return a + b
Using a function adapter¶
[14]:
def f_(args):
return f(*args)
[15]:
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)
[16]:
chunks
[16]:
[array([0, 1]),
array([2, 3]),
array([4, 5]),
array([6, 7]),
array([8, 9]),
array([10, 11]),
array([12, 13]),
array([14, 15]),
array([16, 17]),
array([18, 19]),
array([20, 21]),
array([22, 23])]
[17]:
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(f_, chunks)
list(res)
[17]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]
Using multiple argument iterables¶
[18]:
with ProcessPoolExecutor(max_workers=4) as pool:
res = pool.map(f, range(0,24,2), range(1,24,2))
list(res)
[18]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]
Using processes in parallel with ThreadPoolExecutor¶
We do not get any speedup because the GIL only allows one thread to run at one time.
[19]:
%%time
with ThreadPoolExecutor(max_workers=4) as pool:
res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 6.7 s, sys: 24 ms, total: 6.72 s
Wall time: 6.6 s
[20]:
np.array(list(res))
[20]:
array([3.14072, 3.13664, 3.14116, 3.13712, 3.13772, 3.13812, 3.13192,
3.14952, 3.14216, 3.15732])
Using multiprocessing
¶
The concurrent.futures.ProcessPoolExecutor
is actually a wrapper for multiprocessing.Pool
to unify the threading and process interfaces. I typically just work directly with mutliprocessing
since I don’t have much use for threads. One nice thing about using multiprocessing
apart from more fine-grai control if you need it, is that it typically works equally well for small numbers of large jobs, or large numbers of small jobs out of the box using a heuristic to guess the optimal
chunksize
.
[21]:
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 0 ns, sys: 28 ms, total: 28 ms
Wall time: 1.93 s
[22]:
np.array(res)
[22]:
array([3.14008, 3.14008, 3.14008, 3.14008, 3.14616, 3.14616, 3.14616,
3.14616, 3.1332 , 3.1332 ])
[23]:
%%time
with mp.Pool(processes=4) as pool:
res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])
CPU times: user 8 ms, sys: 24 ms, total: 32 ms
Wall time: 1.63 s
[24]:
np.array(res)
[24]:
array([3.2 , 3.24, 3.16, ..., 3.04, 3.16, 3.36])
Functions with multiple arguments¶
Multiprocessing Pool
has a starmap
method that removes the need to write a wrapper function.
[25]:
def f(a, b):
return a + b
[26]:
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)
[26]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]
Partial application¶
Sometimes, functools.partial
can be used to reduce the number of arguments needed to just one.
[27]:
def f(a, b):
return a * b
[28]:
from functools import partial
fp = partial(f, b=2)
[29]:
xs = np.arange(24)
with Pool(processes=4) as pool:
res = pool.map(fp, xs)
np.array(list(res))
[29]:
array([ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 42, 44, 46])
Blocking and non-blocking calls¶
[30]:
def func(n):
time.sleep(n)
return n
[31]:
with Pool(processes=4) as pool:
res = pool.map(func, [3,3,3,3,3])
print("Control back!")
res
Control back!
[31]:
[3, 3, 3, 3, 3]
[32]:
with Pool(processes=4) as pool:
res = pool.map_async(func, [3,3,3,3,3])
print("Control back!")
print(res.ready())
res.wait()
print(res.ready())
print(res.get())
Control back!
False
True
[3, 3, 3, 3, 3]
Different jobs to different processes¶
[33]:
def f1(n):
time.sleep(1)
return n
def f2(n):
time.sleep(1)
return n**2
def f3(n):
time.sleep(1)
return n**3
def f4(n):
time.sleep(1)
return n**4
[34]:
%%time
with Pool(processes=4) as pool:
res = []
for i, f in enumerate([f4, f2, f3, f1]):
res.append((i, pool.apply(f, [2])))
print(res)
[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 8 ms, sys: 24 ms, total: 32 ms
Wall time: 4.13 s
[35]:
%%time
with Pool(processes=4) as pool:
res = []
for i, f in enumerate([f4, f2, f3, f1]):
res.append((i, pool.apply_async(f, [2])))
print([(i, r.get()) for i, r in res])
[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 8 ms, sys: 16 ms, total: 24 ms
Wall time: 1.12 s
Creating individual processes¶
If you need more control over individual processes than Pool provides - namely, if you need to share information across processes, you can work with individual workers and thread-safe memory structures. This is just for completeness as most data processing tasks do not require this level of control.
[36]:
def f(i):
time.sleep(np.random.random())
print(os.getpid(), i)
[37]:
for i in range(10):
p = mp.Process(target=f, args=(i,))
p.start()
p.join()
45199 0
45202 1
45205 2
45208 3
45211 4
45214 5
45217 6
45220 7
45223 8
45226 9
Using Value and Array for sharing data¶
Counting number of jobs (1)¶
This does not work.
[40]:
def f2(i):
global counter
counter = counter + 1
print(os.getpid(), i)
Checking¶
[41]:
counter = 0
f2(10)
print(counter)
45068 10
1
[42]:
counter = 0
for i in range(10):
p = mp.Process(target=f2, args=(i,))
p.start()
p.join()
45249 0
45252 1
45255 2
45258 3
45261 4
45264 5
45267 6
45270 7
45273 8
45276 9
Counting number of jobs (2)¶
We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).
[44]:
def f3(i, counter, store):
counter.value += 1
store[os.getpid() % 10] += 1
[45]:
%%time
counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)
for i in range(int(1e2)):
p = mp.Process(target=f3, args=(i, counter, store))
p.start()
p.join()
print(counter.value)
print(store[:])
100
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
CPU times: user 52 ms, sys: 256 ms, total: 308 ms
Wall time: 735 ms
Counting number of jobs (3)¶
We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the map-reduce
pattern. We should also use Pool to reuse processes rather than spawn too many of them.
[46]:
def f4(i):
return (os.getpid(), 1, i)
[47]:
%%time
# map step
with mp.Pool(processes=4) as pool:
res = pool.map(f4, range(int(1e2)))
CPU times: user 4 ms, sys: 20 ms, total: 24 ms
Wall time: 150 ms
Reduce steps¶
[48]:
res = np.array(res)
[49]:
res[np.random.choice(len(res), 10)]
[49]:
array([[45381, 1, 57],
[45379, 1, 2],
[45379, 1, 4],
[45380, 1, 10],
[45381, 1, 19],
[45381, 1, 58],
[45379, 1, 93],
[45380, 1, 7],
[45381, 1, 60],
[45379, 1, 42]])
[50]:
import pandas as pd
[51]:
df = pd.DataFrame(res, columns=['pid', 'one', 'i'])
[52]:
df.groupby('pid').sum()
[52]:
one | i | |
---|---|---|
pid | ||
45379 | 28 | 1211 |
45380 | 28 | 1358 |
45381 | 28 | 1652 |
45382 | 16 | 729 |