[1]:
%matplotlib inline

Parallel Programming

This is an elementary treatment that covers simple use cases that often arise in statistics and data science, and is focused mostly on parallelism on a multi-core computer. More specialized frameworks for parallel programming such as those listed are not covered:

  • Message Passing Interface (MPI). Framework for parallelism based on message passing on a compute cluster. Often used when parallel processes need to frequently communicate with each other. More common in scientific than data analysis applications.

  • GPU computing (CUDA, OpenCL). Framework for exploiting the massively parallel but basic compute units available in modern GPUs. Ideal when you need a large number of simple arithmetic function calls - for example, in deep learning. The CuPy library looks like a promising GPU-accelerated alternative to numpy if you have a modern NVidia GPU.

  • Distributed computing (Spark, Dask). Framework for master-worker parallelism on a cluster of commodity machines. Master builds a graph of task dependencies and schedules to execute tasks in the appropriate order.

This first lecture is for mainly illustrating concepts. The second will provide more examples of using parallelism in Python programs.

Checking machine capabilities on a Mac

[2]:
! sysctl hw.physicalcpu hw.logicalcpu
sysctl: cannot stat /proc/sys/hw/physicalcpu: No such file or directory
sysctl: cannot stat /proc/sys/hw/logicalcpu: No such file or directory

For a Unix system, use this instead

[3]:
! lscpu --all --extended
CPU NODE SOCKET CORE L1d:L1i:L2:L3 ONLINE
0   0    0      0    0:0:0:0       yes
1   0    0      1    1:1:1:0       yes
2   0    0      2    2:2:2:0       yes
3   0    0      3    3:3:3:0       yes
4   0    0      4    4:4:4:0       yes
5   0    0      5    5:5:5:0       yes
6   0    0      6    6:6:6:0       yes
7   0    0      7    7:7:7:0       yes
8   0    0      8    8:8:8:0       yes
9   0    0      9    9:9:9:0       yes
10  0    0      10   10:10:10:0    yes
11  0    0      11   11:11:11:0    yes
12  0    0      12   12:12:12:0    yes
13  0    0      13   13:13:13:0    yes
14  0    0      14   14:14:14:0    yes
15  0    0      15   15:15:15:0    yes

Referneces

Common sense

  • Make it right before trying to make it fast

  • Weight the trade-off between programmer and program time

  • Don’t blindly optimize - profile code before optimization

  • A new algorithm or data structure can reduce the complexity class; parallelism generally only offers linear speed ups

Basic Concepts

Concurrent, parallel, distributed

  • Concurrent programs have tasks with overlapping lifetimes, and concurrent programming is based on units of execution that can be run in any order with the same final result. Concurrent programs can but need not be executed in parallel - for example, the time-slicing performed by the Python’s Global Interpreter Lock (GIL), which only allows one thread to be physically executed at any point in time, is an example of concurrency.

  • Parallel programs run at the same time, for example, on multiple cores. If there is no need to control access to shared resources, these are known as embarrassingly parallel programs. All parallel programs are thus concurrent, but not vice versa.

  • Distributed programs run over multiple computers. The term is most often associated with “Big Data” problems, where data transfer is the computing bottleneck - hence the philosophy of “bring the computation to the data”.

Synchronous and asynchronous calls

When there is concurrent access to a shared resource, errors can arise. For example, consider the following code executed by two concurrent processes which share the variable i with an initial value of 0:

i = i + 1

The following may happen: Process 1 reads (i = 0), Process 2 reads (i = 0), Process 1 updates (i = 1), Process 2 overwrites the value put in by Process 1 (i = 1). Hence, after two updates, the value of i is 1 and not 2.

To avoid this problem, the first process to access the resource may lock it for exclusive access:

Process 1 reads and locks (i = 0), Process 2 tries to access i but it is locked, Process 1 updates and releases lock (i = 1), Process 2 now reads and locks (i = 1), and updates and unlocks (i = 2). Hence, after two updates, the value of i is 2 as it should be. Note that because the processes are synchronous, each process blocks access to the shared resource, forcing other process to wait - in Python, this waiting manifests as lack of access to the interpreter until the computation is complete. In this context, synchronous and blocking have the same meaning.

Asynchronous or non-blocking calls were designed to allow access to the interpreter at all times - an asynchronous function call returns control to the interpreter immediately, and returns a future object that can be used to check at a later point in time if the computation is completed and if so, to return the answer.

Concurrency is difficult

Designing programs that share resources or have other dependencies to run concurrently in an effective way is challenging. One of the main reasons for the increasing popularity of functional programming is that concurrency is generally easier with a functional programming style - for example, the famous map-reduce framework for distributed computing uses the map and reduce abstractions of functional programming.

Here we illustrate two famous problems that may arise in concurrent programs:

Race condition

A race condition is one where the result depends on the order of accessing a shared resource, as in the simple example of updating i given above.

In the example below, up to 4 processes may be trying to increment and assign a new value to val at the same time. Because this takes two steps (increment the RHS, assign to LHS), it can happen that two or more processes increment at the same time, but this is only assigned and counted once.

[4]:
from multiprocessing import Pool, Value

How the code works will become clear in the next lecture, but the basic idea is that we have 4 processes trying to increment a value, and because of race conditions, this gives the wrong final count.

[5]:
def count_with_value(i):
    val.value += 1

val = Value('i', 0)
with Pool(processes=4) as pool:
    pool.map(count_with_value, range(1000))

val.value
[5]:
498
Preventing race conditions by locking resources
[6]:
from multiprocessing import Lock
[7]:
lock = Lock()

def count_with_lock(i, lock=lock):
    lock.acquire()
    val.value += 1
    lock.release()

val = Value('i', 0)
with Pool(processes=4) as pool:
    pool.map(count_with_lock, range(1000))

val.value
[7]:
1000
Preventing race conditions by duplicating resources

It is usually easier and faster to make copies of resources for each process so that no sharing is required.

[8]:
import os
from multiprocessing import Array
import multiprocessing as mp
[9]:
def count_with_array(i):
    ix = mp.current_process().ident % 4
    arr[ix] += 1

arr = Array('i', [0]*4)

with Pool(processes=4) as pool:
    pool.map(count_with_array, range(1000))

arr[:], sum(arr)
[9]:
([252, 307, 189, 252], 1000)

Deadlock

Suppose each process needs both resources i and j to compute and return a value. Process 1 reads and locks i but before it can read j, Process 2 reads and locks j. Now both processes are waiting but cannot access the other shared resource, and hence the program hangs forever.

There is a story about the dining philosophers to help you remember this - a group of philosophers are seated on a round table - on the left of each philosopher is a fork and on the right is a knife. Philosophers need a fork and a knife to eat. At the beginning, each philosopher grabs the utensil to the right - now none of the philosophers have both a knife and a fork, and they starve to death.

To break deadlock, a possible strategy is to release locked resources upon failure and wait a random amount of time before trying to acquire it again.

Amdahl’s and Gustafson’s laws

Suppose you had a million core machine. How much can you speed up your program by parallelizing it?

Ahmdahl recognized that the degree of speed-up is driven by the ratio of serial to parallelziable code segments. Serial parts are those where the order is essential \(a \to b \to c\). Suppose the serial section of code takes up 10% of the the total run-time. Then the maximum speed up 10-fold - the serial part takes up 10% of the time, and you throw one million minus one cores at the parallelizable parts driving their execution time towards zero, giving the 10-fold speed up.

Gustafson pointed out that as the size of the problem increases, the time spent in serial code segments typically goes down. For example, MCMC is inherently all serial, but for very large data sets, within each MCMC step, there is typically a need to calculate the likelihood of each data point, and this can be parallelized. We used this idea to achieve over two orders of magnitude speed up for fitting statistical mixture models using GPU computing.

Opportunities for parallelism

There are 3 common areas where parallelism can be useful in an algorithm:

Data

  • Decomposition of arrays along rows, columns, blocks

  • Decomposition of trees into sub-trees

Tasks

  • Loops

  • Function calls

Pipelines

If there are a sequence of dependent stages, as soon as one stage completes and hands off to the next stage, it is ready to accept a job from the previous stage

  • Data processing pipeline

\[\begin{split}a \\ a \to b \\ a \to b \to c \\ a \to b \to c \\ a \to b \to c \\ b \to c \\ c\end{split}\]

Common parallelization idioms

  • Loop parallelism automatically performs the tasks in a loop in parallel

  • Fork-join repeatedly splits execution into multiple branches (fork), then merging the branches (join), and is often used for recursive problems.

  • Single program multiple data (SPMD), where the same program runs on multiple cores on one or more computers, but process different inputs.

  • Master worker splits processes into a master that generates subproblems for multiple workers to complete

Loop parallelism with joblib

[10]:
from joblib import Parallel, delayed
from time import sleep
[11]:
def func(n):
    sleep(n)
    return n
[12]:
%%time

[func(1) for i in range(4)]
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 4 s
[12]:
[1, 1, 1, 1]

The joblib library has a list comprehension idiom for loop parallelism.

[13]:
%%time

Parallel(n_jobs=4)(delayed(func)(1) for i in range(4))
CPU times: user 16 ms, sys: 32 ms, total: 48 ms
Wall time: 1.37 s
[13]:
[1, 1, 1, 1]

Note that the delayed function is just a technical device to allow function calls with the usual function call syntax.

[14]:
def func_args(n, *args, **kwargs):
    sleep(n)
    return args, kwargs
[15]:
%%time

Parallel(n_jobs=4)(delayed(func_args)(1, 2, 3, a=4, b=5) for i in range(4))
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 1.01 s
[15]:
[((2, 3), {'a': 4, 'b': 5}),
 ((2, 3), {'a': 4, 'b': 5}),
 ((2, 3), {'a': 4, 'b': 5}),
 ((2, 3), {'a': 4, 'b': 5})]

In the next lecture we will see the use of parallel ranges for loop parallelism with nuumba and cython

Threads and processes

We can think of threads and processes as independent workers. The main difference is that threads run in the same memory space, while each process has its own memory space. This means that threads are “light-weight” compared to processes - they are faster to create and consume fewer resources.

However, because threads run in the same memory space, concurrency issues such as race conditions and deadlock can occur. Because of this, the most commonly used Python interpreter (CPython) uses the Global Interpreter Lock (GIL) to ensure that only a single thread is operating in a Python program any one time, and switches between multiple threads to give the illusion of parallelism. The take-home message is that you can use threads for parallel tasks that are are slow because of latency (e.g. network request), but should use processes for parallel tasks that are compute-intensive (e.g. mathematical calculations).

[16]:
def func_with_latency(n):
    sleep(n)
    return n
[17]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

Threads work well for high latency processes - e.g. disk access, network requests, await user input etc - here simulated with sleep

[18]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    pool.map(func_with_latency, [1,1,1,1])
CPU times: user 16 ms, sys: 8 ms, total: 24 ms
Wall time: 1.03 s
[19]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    pool.map(func_with_latency, [1,1,1,1])
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1 s

But not so well for compute-intensive processes

[20]:
import math
[21]:
def func_compute_intesnive(n):
    s = 0
    for i in range(1, n+1):
        s += math.exp(math.log(math.sqrt(math.pow(i, 2.0))))
    return s
[22]:
n = 1000000
[23]:
%%time

func_compute_intesnive(n)
CPU times: user 588 ms, sys: 0 ns, total: 588 ms
Wall time: 587 ms
[23]:
500000500000.0
[24]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    pool.map(func_compute_intesnive, [n,n,n,n])
CPU times: user 8 ms, sys: 20 ms, total: 28 ms
Wall time: 663 ms
[25]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    pool.map(func_compute_intesnive, [n,n,n,n])
CPU times: user 2.52 s, sys: 44 ms, total: 2.56 s
Wall time: 2.52 s

Embarrassingly parallel programs

Many statistical problems can be easily decomposed into independent tasks or data sets. Here are several examples:

  • Monte Carlo integration

  • Multiple chains of MCMC

  • Bootstrap for confidence intervals

  • Power calculations by simulation

  • Permutation-resampling tests

  • Fitting same model on multiple data sets

These “low hanging fruits” are great because they offer a path to easy parallelism with minimal complexity.

[ ]: