Multi-Core Parallelism

In [7]:
%load_ext cython
In [8]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time
from numba import njit

Vanilla Python

In [12]:
def mc_pi(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n
In [5]:
%%time

res = [mc_pi(int(1e7)) for i in range(10)]
CPU times: user 2min 35s, sys: 362 ms, total: 2min 35s
Wall time: 2min 35s

Using numba to speed up computation

In [13]:
@njit()
def mc_pi_numba(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n
In [17]:
%%time

res = [mc_pi_numba(int(1e7)) for i in range(10)]
CPU times: user 3.46 s, sys: 12.1 ms, total: 3.47 s
Wall time: 3.46 s
In [18]:
np.array(res)
Out[18]:
array([ 3.1405108,  3.1404128,  3.1425416,  3.1410152,  3.1415436,
        3.1424268,  3.141788 ,  3.1412084,  3.1411844,  3.14021  ])

Using cython to speed up computation

Note the use of an external C library (GNU Scientific Library or gsl) to replace numpy random number generators (which are slow for generating one number at a time). The GSL has already been packaged for use in Cython, so we just have to pip install it.

Install cythongsl if necessary and restart kernel.

! pip install cythongsl
In [9]:
%%cython -lgsl

import cython
from cython_gsl cimport *

@cython.cdivision(True)
def mc_pi_cython(int n):
    cdef gsl_rng_type * T
    cdef gsl_rng * r
    cdef double s = 0.0
    cdef double x, y
    cdef int i

    gsl_rng_env_setup()

    T = gsl_rng_default
    r = gsl_rng_alloc (T)

    for i in range(n):
        x = 2*gsl_rng_uniform(r) - 1
        y = 2*gsl_rng_uniform(r)- 1
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n
In [15]:
%%time

res = [mc_pi_cython(int(1e7)) for i in range(10)]
CPU times: user 2.67 s, sys: 7.42 ms, total: 2.68 s
Wall time: 2.67 s
In [16]:
np.array(res)
Out[16]:
array([ 3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584,
        3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584])

The concurrent.futures module

Concurrent processes are processes that will return the same results regardless of the order in which they were executed. A “future” is something that will return a result sometime in the future. The concurrent.futures module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.

While concurrent futures provide a simpler interface, it is slower and less flexible when compared with using multiprocessing for parallel execution.

Using processes in parallel with ProcessPoolExecutor

We get a linear speedup as expected.

In [36]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
CPU times: user 13 ms, sys: 32.3 ms, total: 45.3 ms
Wall time: 958 ms
In [37]:
np.array(list(res))
Out[37]:
array([ 3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584,
        3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584])

When you have many jobs

The futures object gives fine control over the process, such as adding callbacks and canceling a submitted job, but is computationally expensive. We can use the chunksize argument to reduce this cost when submitting many jobs.

Using default chunksize of 1 for 10000 jobs

The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time.

In [12]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
CPU times: user 5.07 s, sys: 1.85 s, total: 6.92 s
Wall time: 6.25 s

Using chunksize of 100

In [13]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))], chunksize=100)
CPU times: user 98.2 ms, sys: 74.9 ms, total: 173 ms
Wall time: 888 ms

Fine control of processes

Status of processes

In [ ]:
with ProcessPoolExecutor(max_workers=4) as pool:
    a = pool.submit(f2, 1, 1)
    b = pool.submit(f2, 1,2)
    c = pool.submit(f1, 10)

    print('a running:', a.running())
    print('a done:', a.done())

    print('b running:', b.running())
    print('b done:', b.done())

    print('c running:', c.running())
    print('c done:', c.done())

    print('a result', a.result())
    print('b result', b.result())
    print('c result', c.result())

Canceling jobs and adding callbacks

In [ ]:
njobs = 24

res = []

with ProcessPoolExecutor(max_workers=4) as pool:

    for i in range(njobs):
        res.append(pool.submit(f2, *np.random.rand(2)))
        if i % 2 == 0:
            res[i].add_done_callback(lambda future: print("Process done!"))
    res[4].cancel()
    if res[4].cancelled():
        print("Process 4 cancelled")

    for i, x in enumerate(res):
        while x.running():
            print("Running")
            time.sleep(1)
        if not x.cancelled():
            print(x.result())

Functions with multiple arguments

In [10]:
def f(a, b):
    return a + b

Using a function adapter

In [14]:
def f_(args):
    return f(*args)
In [1]:
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)
In [2]:
chunks
Out[2]:
[array([0, 1]),
 array([2, 3]),
 array([4, 5]),
 array([6, 7]),
 array([8, 9]),
 array([10, 11]),
 array([12, 13]),
 array([14, 15]),
 array([16, 17]),
 array([18, 19]),
 array([20, 21]),
 array([22, 23])]
In [15]:

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(f_, chunks)
list(res)
Out[15]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Using processes in parallel with ThreadPoolExecutor

We do not get any speedup because the GIL only allows one thread to run at one time.

In [38]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
CPU times: user 2.68 s, sys: 27.3 ms, total: 2.7 s
Wall time: 2.68 s
In [39]:
np.array(list(res))
Out[39]:
array([ 3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584,
        3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584])

Turning off the GIL in cython

In [16]:
%%cython -lgsl

import cython
from cython_gsl cimport *

@cython.cdivision(True)
def mc_pi_cython_nogil(int n):
    cdef gsl_rng_type * T
    cdef gsl_rng * r
    cdef double s = 0.0
    cdef double x, y
    cdef int i

    gsl_rng_env_setup()

    T = gsl_rng_default
    r = gsl_rng_alloc (T)

    with cython.nogil:
        for i in range(n):
            x = 2*gsl_rng_uniform(r) - 1
            y = 2*gsl_rng_uniform(r)- 1
            if (x**2 + y**2) < 1:
                s += 1
    return 4*s/n

Using processes in parallel with ThreadPoolExecutor and nogil

We finally get the linear speedup expected. Note that threads are actually faster than processes because there is less overhead to using a thread.

In [40]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_cython_nogil, [int(1e7) for i in range(10)])
CPU times: user 2.67 s, sys: 4.2 ms, total: 2.68 s
Wall time: 803 ms
In [41]:
np.array(list(res))
Out[41]:
array([ 3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584,
        3.1414584,  3.1414584,  3.1414584,  3.1414584,  3.1414584])

Using multiprocessing

One nice thing about using multiprocessing is that it works equally well for small numbers of large jobs, or large numbers of small jobs out of the box.

In [ ]:
mp.
In [19]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])
CPU times: user 8.74 ms, sys: 23.6 ms, total: 32.4 ms
Wall time: 858 ms
In [20]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])
CPU times: user 14.5 ms, sys: 26.4 ms, total: 40.9 ms
Wall time: 753 ms

Creating individual processes

In [22]:
def f(i):
    time.sleep(np.random.random())
    print(os.getpid(), i)
In [23]:
for i in range(10):
    p = mp.Process(target=f, args=(i,))
    p.start()
    p.join()
7910 0
7911 1
7912 2
7913 3
7914 4
7915 5
7916 6
7917 7
7918 8
7919 9

Functions with multiple arguments

Multiprocessing Pool has a starmap method that removes the need to write a wrapper function.

In [ ]:
def f(a, b):
    return a + b
In [18]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)
Out[18]:
[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Partial application

Sometimes, functools.partial can be used to reduce the number of arguments needed to just one.

In [19]:
def f(a, b):
    return a * b
In [20]:
from functools import partial

fp = partial(f, b=2)
In [25]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.map(fp, xs)
np.array(list(res))
Out[25]:
array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
       34, 36, 38, 40, 42, 44, 46])

How do we get a return value from a process?

In [24]:
def f1(q, i):
    time.sleep(np.random.random())
    q.put((os.getpid(), i))
In [25]:
q = mp.Queue()

res = []
for i in range(10):
    p = mp.Process(target=f1, args=(q,i,))
    p.start()
    res.append(q.get())
    p.join()

res
Out[25]:
[(7920, 0),
 (7921, 1),
 (7922, 2),
 (7924, 3),
 (7925, 4),
 (7926, 5),
 (7927, 6),
 (7928, 7),
 (7929, 8),
 (7930, 9)]

Counting number of jobs (1)

In [26]:
def f2(i):
    global counter
    counter = counter + 1
    print(os.getpid(), i)

Checking

In [27]:
counter = 0
f2(10)
print(counter)
7869 10
1
In [28]:
counter = 0

for i in range(10):
    p = mp.Process(target=f2, args=(i,))
    p.start()
    p.join()
7931 0
7932 1
7933 2
7934 3
7935 4
7936 5
7937 6
7938 7
7939 8
7940 9

Note that separate processes have their own memory and DO NOT share global memory

In [29]:
counter
Out[29]:
0

Counting number of jobs (2)

We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).

In [30]:
def f3(i, counter, store):
    counter.value += 1
    store[os.getpid() % 10] += i
In [31]:
%%time

counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)

for i in range(int(1e2)):
    p = mp.Process(target=f3, args=(i, counter, store))
    p.start()
    p.join()

print(counter.value)
print(store[:])
100
[540, 450, 460, 470, 480, 490, 500, 510, 520, 530]
CPU times: user 102 ms, sys: 612 ms, total: 714 ms
Wall time: 2.68 s

Counting number of jobs (3)

We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the map-reduce pattern. We should also use Pool to reuse processes rather than spawn too many of them. We will see much more of the map-reduc approach when we work with Spark.

In [32]:
def f4(i):
    return (os.getpid(), 1, i)
In [33]:
%%time

# map step
with mp.Pool(processes=10) as pool:
    res = pool.map(f4, range(int(1e2)))

#reeduce step
res = np.array(res)

counter = res[:, 1].sum()
print(counter)

store = np.zeros(10)
idx = res[:, 0] % 10
for i in range(10):
    store[i] = res[idx==i, 2].sum()

print(store)
100
[ 540.  450.  460.  470.  480.  490.  500.  510.  520.  530.]
CPU times: user 90.9 ms, sys: 359 ms, total: 450 ms
Wall time: 860 ms

Simplifying use of multiprocessing with the deco package

Install as usual with

pip install deco

The deco package provides two decorators that streamlines the execution of embarrassingly parallel programs. Under the hood, it uses multiprocessing.

In [1]:
import time

Serial version

In [2]:
def process_one(x):
    """Slow function (> 1 ms) to process one piece of data."""
    time.sleep(1)
    print(x)

def process_many(xs):
    for x in xs:
        process_one(x)
In [3]:
xs = np.arange(10)
In [4]:
%time process_many(xs)
0
1
2
3
4
5
6
7
8
9
CPU times: user 11.3 ms, sys: 4.18 ms, total: 15.5 ms
Wall time: 10 s

Parallel version

Add @concurrent to the functions that should be run in parallel, and @synchronized to the functions calling the concurrent processes. Because of deco overhead, it should only be used to parallelize functions that take over 1 ms to complete.

In [5]:
from deco import concurrent, synchronized
In [12]:
@concurrent()
def process_one(x):
    """Slow function to process one piece of data."""
    time.sleep(1)
    print(x)

@synchronized
def process_many(xs):
    for x in xs:
        process_one(x)
In [7]:
%time process_many(xs)
3
7
4
6
1
0
5
2
9
8
CPU times: user 21.9 ms, sys: 49.6 ms, total: 71.5 ms
Wall time: 2.07 s

Changing default number of processes

In [10]:
@concurrent(processes = 2)
def process_one(x):
    """Slow function (> 1 ms) to process one piece of data."""
    time.sleep(1)
    print(x)

@synchronized
def process_many(xs):
    for x in xs:
        process_one(x)
In [11]:
%time process_many(xs)
0
1
2
3
4
5
6
7
8
9
CPU times: user 21.9 ms, sys: 28 ms, total: 49.9 ms
Wall time: 5.05 s

Common issues with use of shared memory in parallel programs

Writing to shared memory requires careful coordination of processes, and many control and communication concepts are implemented in the multiprocessing library for this purpose, including semaphores, locks, barriers etc. We will not cover these concepts due to their complexity, choosing instead to decouple processes (leading to embarrassingly parallel problems) by making redundant copies of resources if necessary and reducing at a later stage if necessary. Most problems in statistical data analysis can be solved using this simple approach.

Race conditions

In the example below, up to 4 processes may be trying to increment and assign a new value to val at the same time. Because this takes two steps (increment the RHS, assign to LHS), it can happen that two or more processes increment at the same time, but this is only assigned and counted once.

In [3]:
def count1(i):
    val.value += 1

for run in range(3):
    val = Value('i', 0)
    with Pool(processes=4) as pool:
        pool.map(count1, range(1000))

    print(val.value)
436
389
378

It is usually easier and faster to make copies of resources for each process so that no sharing is required.

In [4]:
def count2(i):
    ix = os.getpid() % 4
    arr[ix] += 1

for run in range(3):
    arr = Array('i', [0]*4)

    with Pool(processes=4) as pool:
        pool.map(count2, range(1000))

    print(arr[:], np.sum(arr))
[252, 252, 252, 244] 1000
[252, 252, 244, 252] 1000
[252, 252, 252, 244] 1000

Deadlock

Suppose there are two processes P1 and P2 and two resources A and B. Suppose P1 has a lock on A and will only release A after it gains B, while P2 has a lock on B and will only release the lock after it gains A. The two processes are doomed to wait forever; this is known as a deadlock and can occur when concurrent processes compete to have exclusive access to the same shared resources. A classic model of deadlock is the Dining Philosophers Problem.

We will not show any examples since it will simply freeze the notebook.

In [ ]: