Multi-Core Parallelism

[1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import os
[2]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time

Vanilla Python

Toy problem: Estimate \(\pi\) by sampling points at random within a box circumscribing the unit circle and counting the fraction that fall within the circle. This is a simple example of a Monte Carlo algorithm. We will parallelize this embarrassingly parallel problem.

[3]:
def mc_pi(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n
[4]:
%%time

mc_pi(int(1e5))
CPU times: user 624 ms, sys: 4 ms, total: 628 ms
Wall time: 626 ms
[4]:
3.14564
[5]:
%%time

res = [mc_pi(int(1e5)) for i in range(10)]
CPU times: user 5.94 s, sys: 0 ns, total: 5.94 s
Wall time: 5.93 s

The concurrent.futures module

Concurrent processes are processes that will return the same results regardless of the order in which they were executed. A “future” is something that will return a result sometime in the future. The concurrent.futures module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.

Using processes in parallel with ProcessPoolExecutor

We get a linear speedup as expected.

[6]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 32 ms, sys: 20 ms, total: 52 ms
Wall time: 1.86 s
[7]:
np.array(list(res))
[7]:
array([3.13784, 3.13784, 3.13784, 3.13784, 3.13968, 3.13968, 3.13968,
       3.13968, 3.14136, 3.14136])

When you have many jobs

The futures object gives fine control over the process, such as adding callbacks and canceling a submitted job, but is computationally expensive. We can use the chunksize argument to reduce this cost when submitting many jobs - this specifies the number of tasks to be given to a worker at a time. A detailed explanation of chunksize is provided here

Using default chunksize

The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time, but this is not true due to the overhead described above.

[8]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])
CPU times: user 3.26 s, sys: 1.81 s, total: 5.07 s
Wall time: 3.19 s

Using chunksize of 100

[9]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))], chunksize=100)
CPU times: user 56 ms, sys: 52 ms, total: 108 ms
Wall time: 1.5 s

Fine control of processes

Status of processes

[10]:
def f1(x):
    return x**2

def f2(x, y):
    return x*y
[11]:
with ProcessPoolExecutor(max_workers=4) as pool:
    a = pool.submit(f2, 1, 1)
    b = pool.submit(f2, 1,2)
    c = pool.submit(f1, 10)

    print('a running:', a.running())
    print('a done:', a.done())

    print('b running:', b.running())
    print('b done:', b.done())

    print('c running:', c.running())
    print('c done:', c.done())

    print('a result', a.result())
    print('b result', b.result())
    print('c result', c.result())
a running: True
a done: True
b running: False
b done: True
c running: False
c done: True
a result 1
b result 2
c result 100

Canceling jobs and adding callbacks

For example, if you launch multiple versions of the same task for safety, you might want to cancel the duplicate tasks once one of them has completed.

Callbacks are to allow the function to notify you when some event occurs.

[12]:
njobs = 24

res = []

with ProcessPoolExecutor(max_workers=4) as pool:

    for i in range(njobs):
        res.append(pool.submit(f2, *np.random.rand(2)))
        if i % 2 == 0:
            res[i].add_done_callback(lambda future: print("Process done!"))
    res[4].cancel()
    if res[4].cancelled():
        print("Process 4 cancelled")

    for i, x in enumerate(res):
        while x.running():
            print("Running")
            time.sleep(1)
        if not x.cancelled():
            print(x.result())
Process done!
Process 4 cancelled
Running
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
0.6722884468935053
0.545729761075415
0.010586909677611098
0.3381938160419955
0.4116097214015501
0.18973574383421796
0.7778720057180574
0.10077814728759954
0.06055768121630078
0.3020277653820008
0.06874279853124576
0.17851026444067972
0.08987112970497925
0.07174658531419248
0.07864945674072424
0.13488073927114674
0.10792043155043497
0.7681153304311592
0.4552605247520855
0.7565851958126836
0.3062866391681099
0.21432713825666197
0.4555355058023458

Functions with multiple arguments

Using a pool and the map method with functions requiring multiple arguments can be done in two ways.

[13]:
def f(a, b):
    return a + b

Using a function adapter

[14]:
def f_(args):
    return f(*args)
[15]:
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)
[16]:
chunks
[16]:
[array([0, 1]),
 array([2, 3]),
 array([4, 5]),
 array([6, 7]),
 array([8, 9]),
 array([10, 11]),
 array([12, 13]),
 array([14, 15]),
 array([16, 17]),
 array([18, 19]),
 array([20, 21]),
 array([22, 23])]
[17]:
with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(f_, chunks)
list(res)
[17]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Using multiple argument iterables

[18]:
with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(f, range(0,24,2), range(1,24,2))
list(res)
[18]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Using processes in parallel with ThreadPoolExecutor

We do not get any speedup because the GIL only allows one thread to run at one time.

[19]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 6.7 s, sys: 24 ms, total: 6.72 s
Wall time: 6.6 s
[20]:
np.array(list(res))
[20]:
array([3.14072, 3.13664, 3.14116, 3.13712, 3.13772, 3.13812, 3.13192,
       3.14952, 3.14216, 3.15732])

Using multiprocessing

The concurrent.futures.ProcessPoolExecutor is actually a wrapper for multiprocessing.Pool to unify the threading and process interfaces. I typically just work directly with mutliprocessing since I don’t have much use for threads. One nice thing about using multiprocessing apart from more fine-grai control if you need it, is that it typically works equally well for small numbers of large jobs, or large numbers of small jobs out of the box using a heuristic to guess the optimal chunksize.

[21]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])
CPU times: user 0 ns, sys: 28 ms, total: 28 ms
Wall time: 1.93 s
[22]:
np.array(res)
[22]:
array([3.14008, 3.14008, 3.14008, 3.14008, 3.14616, 3.14616, 3.14616,
       3.14616, 3.1332 , 3.1332 ])
[23]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])
CPU times: user 8 ms, sys: 24 ms, total: 32 ms
Wall time: 1.63 s
[24]:
np.array(res)
[24]:
array([3.2 , 3.24, 3.16, ..., 3.04, 3.16, 3.36])

Functions with multiple arguments

Multiprocessing Pool has a starmap method that removes the need to write a wrapper function.

[25]:
def f(a, b):
    return a + b
[26]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)
[26]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Partial application

Sometimes, functools.partial can be used to reduce the number of arguments needed to just one.

[27]:
def f(a, b):
    return a * b
[28]:
from functools import partial

fp = partial(f, b=2)
[29]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.map(fp, xs)
np.array(list(res))
[29]:
array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
       34, 36, 38, 40, 42, 44, 46])

Blocking and non-blocking calls

[30]:
def func(n):
    time.sleep(n)
    return n
[31]:
with Pool(processes=4) as pool:
    res = pool.map(func, [3,3,3,3,3])
    print("Control back!")
res
Control back!
[31]:
[3, 3, 3, 3, 3]
[32]:
with Pool(processes=4) as pool:
    res = pool.map_async(func, [3,3,3,3,3])
    print("Control back!")
    print(res.ready())
    res.wait()
    print(res.ready())
    print(res.get())
Control back!
False
True
[3, 3, 3, 3, 3]

Different jobs to different processes

[33]:
def f1(n):
    time.sleep(1)
    return n

def f2(n):
    time.sleep(1)
    return n**2

def f3(n):
    time.sleep(1)
    return n**3

def f4(n):
    time.sleep(1)
    return n**4
[34]:
%%time

with Pool(processes=4) as pool:
    res = []
    for i, f in enumerate([f4, f2, f3, f1]):
        res.append((i, pool.apply(f, [2])))
    print(res)
[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 8 ms, sys: 24 ms, total: 32 ms
Wall time: 4.13 s
[35]:
%%time

with Pool(processes=4) as pool:
    res = []
    for i, f in enumerate([f4, f2, f3, f1]):
        res.append((i, pool.apply_async(f, [2])))
    print([(i, r.get()) for i, r in res])
[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 8 ms, sys: 16 ms, total: 24 ms
Wall time: 1.12 s

Creating individual processes

If you need more control over individual processes than Pool provides - namely, if you need to share information across processes, you can work with individual workers and thread-safe memory structures. This is just for completeness as most data processing tasks do not require this level of control.

[36]:
def f(i):
    time.sleep(np.random.random())
    print(os.getpid(), i)
[37]:
for i in range(10):
    p = mp.Process(target=f, args=(i,))
    p.start()
    p.join()
45199 0
45202 1
45205 2
45208 3
45211 4
45214 5
45217 6
45220 7
45223 8
45226 9

Using Queues to share information between processes.

[38]:
def f1(q, i):
    time.sleep(np.random.random())
    q.put((os.getpid(), i))
[39]:
q = mp.Queue()

res = []
for i in range(10):
    p = mp.Process(target=f1, args=(q,i,))
    p.start()
    res.append(q.get())
    p.join()

res
[39]:
[(45229, 0),
 (45231, 1),
 (45233, 2),
 (45235, 3),
 (45237, 4),
 (45239, 5),
 (45241, 6),
 (45243, 7),
 (45245, 8),
 (45247, 9)]

Using Value and Array for sharing data

Counting number of jobs (1)

This does not work.

[40]:
def f2(i):
    global counter
    counter = counter + 1
    print(os.getpid(), i)

Checking

[41]:
counter = 0
f2(10)
print(counter)
45068 10
1
[42]:
counter = 0

for i in range(10):
    p = mp.Process(target=f2, args=(i,))
    p.start()
    p.join()
45249 0
45252 1
45255 2
45258 3
45261 4
45264 5
45267 6
45270 7
45273 8
45276 9

Note that separate processes have their own memory and DO NOT share global memory

[43]:
counter
[43]:
0

Counting number of jobs (2)

We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).

[44]:
def f3(i, counter, store):
    counter.value += 1
    store[os.getpid() % 10] += 1
[45]:
%%time

counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)

for i in range(int(1e2)):
    p = mp.Process(target=f3, args=(i, counter, store))
    p.start()
    p.join()

print(counter.value)
print(store[:])
100
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
CPU times: user 52 ms, sys: 256 ms, total: 308 ms
Wall time: 735 ms

Avoiding use of shared memory

Counting number of jobs (3)

We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the map-reduce pattern. We should also use Pool to reuse processes rather than spawn too many of them.

[46]:
def f4(i):
    return (os.getpid(), 1, i)
[47]:
%%time

# map step
with mp.Pool(processes=4) as pool:
    res = pool.map(f4, range(int(1e2)))
CPU times: user 4 ms, sys: 20 ms, total: 24 ms
Wall time: 150 ms
Reduce steps
[48]:
res = np.array(res)
[49]:
res[np.random.choice(len(res), 10)]
[49]:
array([[45381,     1,    57],
       [45379,     1,     2],
       [45379,     1,     4],
       [45380,     1,    10],
       [45381,     1,    19],
       [45381,     1,    58],
       [45379,     1,    93],
       [45380,     1,     7],
       [45381,     1,    60],
       [45379,     1,    42]])
[50]:
import pandas as pd
[51]:
df = pd.DataFrame(res, columns=['pid', 'one', 'i'])
[52]:
df.groupby('pid').sum()
[52]:
one i
pid
45379 28 1211
45380 28 1358
45381 28 1652
45382 16 729