Parallel Programming¶
Referneces
`threading
— Thread-based parallelism <https://docs.python.org/3.6/library/threading.html>`__`multiprocessing
- Process-based “threading” interface <https://docs.python.org/3.6/library/multiprocessing.html>`__`multiprocess
- a “better” `multiprocessing <https://github.com/uqfoundation/multiprocess>`__- concurrent.futures — Launching parallel tasks
- Concurrency with Processes, Threads, and Coroutines
The goal is to design parallel programs that are flexible, efficient and simple.
Step 0: Start by profiling a serial program to identify bottlenecks
Step 1: Are there for opportunities for parallelism?
- Can tasks be performed in parallel?
- Function calls
- Loops
- Can data be split and operated on in parallel?
- Decomposition of arrays along rows, columns, blocks
- Decomposition of trees into sub-trees
- Is there a pipeline with a sequence of stages?
- Data preprocessing and analysis
- Graphics rendering
Step 2: What is the nature of the parallelism?
- Linear
- Embarrassingly parallel programs
- Recursive
- Adaptive partitioning methods
Step 3: What is the granularity?
- 10s of jobs
- 1000s of jobs
Step 4: Choose an algorithm
- Organize by tasks
- Task parallelism
- Divide and conquer
- Organize by data
- Geometric decomposition
- Recursive decomposition
- Organize by flow
- Pipeline
- Event-based processing
Step 5: Map to program and data structures
- Program structures
- Single program multiple data (SPMD)
- Master/worker
- Loop parallelism
- Fork/join
- Data structures
- Shared data
- Shared queue
- Distributed array
Step 6: Map to parallel environment
- Multi-core shared memory
- Cython with OpenMP
- multiprocessing
- IPython.cluster
- Multi-computer
- IPython.cluster
- MPI
- Hadoop / Spark
- GPU
- CUDA
- OpenCL
Step 7: Execute, debug, tune in parallel environment
Embarrassingly parallel programs¶
Many statistical problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:
- Monte Carlo integration
- Multiple chains of MCMC
- Bootstrap for confidence intervals
- Power calculations by simulation
- Permutation-resampling tests
- Fitting same model on multiple data sets
Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data point to calculate the likelihood), and this is an embarrassingly parallel problem within a single iteration.
These “low hanging fruits” are great because they offer a path to easy parallelism with minimal complexity.
Degree of parallelism¶
The bigger the problem, the more scope there is for parallelism
Amhdahls’ law says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the algorithm. However, for big data analysis, Gustafson’s Law is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points.
In [1]:
%matplotlib inline
In [2]:
import numpy as np
import matplotlib.pyplot as plt
from numba import jit, prange
In [3]:
import time
Task parallelism¶
There are many parallel programming idioms or patterns, and this can get quite confusing. One simple way to design a parallel soluiton is to ask if there is data and/or task parallelism.
- Data parallelism means that the data is distributed across processes (e.g. MPI, Hadoop, Spark)
- Task parallelism means that tasks (functiosn) are distributed across processes, and differnet units of work (data) are sent to each task (e.g. multithreading, multiprocessing, signnle GPU programming). Note that is posisble and common for the same task (function) to be distributed to mulitpele processes.
The library multiprocessing
handles task parallelism using
processes. Unlike threads which share the same memory space, each
process has its own memory space. Python does not allow multiple threads
to run at the same time (see Global Interpretr Lock or
GIL), and
multi-threading is achieved by time slicing. Threads are useful for
tasks dominated by latency (wiaitng for network to respond, I/O) but
will not show a speed-up on computationally intesnive tasks due to the
GIL. Hence, computationally intensive tasks need to be run as processes
to see speedups. Later we will see how we can temproarily disable the
GIL for parallel processing in numba
and cython
.
In [4]:
from multiprocessing import (Pool, Process, Queue, Lock,
cpu_count, current_process)
In [5]:
cpu_count()
Out[5]:
8
Embarrasingly parallel¶
In [6]:
from functools import partial
In [7]:
lock = Lock()
def f(n, lock=lock):
worker = current_process()
time.sleep(n)
with lock:
print("Worker %d slept for %.2f seconds!"% (worker._identity[0], n))
In [8]:
sleep_times = [0.1, 0.5, 5, 5, 0.1, 0.1, 1, 1, 5, 1, 1]
In [9]:
with Pool(processes=4) as p:
p.map(f, sleep_times)
Worker 1 slept for 0.10 seconds!
Worker 1 slept for 0.10 seconds!
Worker 1 slept for 0.10 seconds!
Worker 2 slept for 0.50 seconds!
Worker 1 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 3 slept for 5.00 seconds!
Worker 4 slept for 5.00 seconds!
Worker 1 slept for 5.00 seconds!
Consumer-Producer¶
Note that we assign each process with its own random number generator.
This is crude, and does not guarantee that the radnom numbers generated
are independent. If you need to genrate random numbers in parallel,
consider using the
`radnomgen
<https://bashtage.github.io/randomgen/#>`__ package and
the strategies for parallel random number
generation.
In [10]:
def producer(pid, q, lock):
re = np.random.RandomState(pid)
for i in range(3):
time.sleep(re.uniform(0, 3))
n = re.randint(0, 100)
with lock:
print("Producer %d put %d in queue!" % (pid, n))
q.put(n)
def consumer(pid, q, lock):
re = np.random.RandomState(pid)
while True:
n = q.get()
time.sleep(re.uniform(0, 1))
with lock:
print("Conusmer %d got %d" % (pid, n))
q = Queue()
lock = Lock()
producers = []
for i in range(4):
p = Process(target=producer, args=(i, q, lock))
producers.append(p)
consumers = []
for i in range(2):
c = Process(target=consumer, args=(i, q, lock))
c.daemon = True
consumers.append(c)
for p in producers:
p.start()
for c in consumers:
c.start()
# Optional syncrhonization step
for p in producers:
p.join()
Producer 1 put 12 in queue!
Producer 2 put 72 in queue!
Producer 0 put 64 in queue!
Producer 3 put 3 in queue!
Conusmer 1 got 72
Conusmer 0 got 12
Conusmer 1 got 64
Conusmer 0 got 3
Producer 3 put 0 in queue!
Conusmer 1 got 0
Producer 2 put 82 in queue!
Producer 0 put 67 in queue!
Conusmer 0 got 82
Conusmer 1 got 67
Producer 1 put 9 in queue!
Producer 3 put 19 in queue!
Conusmer 1 got 19
Producer 2 put 7 in queue!
Conusmer 1 got 7
Conusmer 0 got 9
Producer 1 put 79 in queue!
Conusmer 1 got 79
Producer 0 put 83 in queue!
Conusmer 0 got 83