Parallel Programming

Referneces

The goal is to design parallel programs that are flexible, efficient and simple.

Step 0: Start by profiling a serial program to identify bottlenecks

Step 1: Are there for opportunities for parallelism?

  • Can tasks be performed in parallel?
    • Function calls
    • Loops
  • Can data be split and operated on in parallel?
    • Decomposition of arrays along rows, columns, blocks
    • Decomposition of trees into sub-trees
  • Is there a pipeline with a sequence of stages?
    • Data preprocessing and analysis
    • Graphics rendering

Step 2: What is the nature of the parallelism?

  • Linear
    • Embarrassingly parallel programs
  • Recursive
    • Adaptive partitioning methods

Step 3: What is the granularity?

  • 10s of jobs
  • 1000s of jobs

Step 4: Choose an algorithm

  • Organize by tasks
    • Task parallelism
    • Divide and conquer
  • Organize by data
    • Geometric decomposition
    • Recursive decomposition
  • Organize by flow
    • Pipeline
    • Event-based processing

Step 5: Map to program and data structures

  • Program structures
    • Single program multiple data (SPMD)
    • Master/worker
    • Loop parallelism
    • Fork/join
  • Data structures
    • Shared data
    • Shared queue
    • Distributed array

Step 6: Map to parallel environment

  • Multi-core shared memory
    • Cython with OpenMP
    • multiprocessing
    • IPython.cluster
  • Multi-computer
    • IPython.cluster
    • MPI
    • Hadoop / Spark
  • GPU
    • CUDA
    • OpenCL

Step 7: Execute, debug, tune in parallel environment

Embarrassingly parallel programs

Many statistical problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:

  • Monte Carlo integration
  • Multiple chains of MCMC
  • Bootstrap for confidence intervals
  • Power calculations by simulation
  • Permutation-resampling tests
  • Fitting same model on multiple data sets

Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data point to calculate the likelihood), and this is an embarrassingly parallel problem within a single iteration.

These “low hanging fruits” are great because they offer a path to easy parallelism with minimal complexity.

Degree of parallelism

The bigger the problem, the more scope there is for parallelism

Amhdahls’ law says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the algorithm. However, for big data analysis, Gustafson’s Law is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points.

In [1]:
%matplotlib inline
In [2]:
import numpy as np
import matplotlib.pyplot as plt
from numba import jit, prange
In [3]:
import time

Task parallelism

There are many parallel programming idioms or patterns, and this can get quite confusing. One simple way to design a parallel soluiton is to ask if there is data and/or task parallelism.

  • Data parallelism means that the data is distributed across processes (e.g. MPI, Hadoop, Spark)
  • Task parallelism means that tasks (functiosn) are distributed across processes, and differnet units of work (data) are sent to each task (e.g. multithreading, multiprocessing, signnle GPU programming). Note that is posisble and common for the same task (function) to be distributed to mulitpele processes.

The library multiprocessing handles task parallelism using processes. Unlike threads which share the same memory space, each process has its own memory space. Python does not allow multiple threads to run at the same time (see Global Interpretr Lock or GIL), and multi-threading is achieved by time slicing. Threads are useful for tasks dominated by latency (wiaitng for network to respond, I/O) but will not show a speed-up on computationally intesnive tasks due to the GIL. Hence, computationally intensive tasks need to be run as processes to see speedups. Later we will see how we can temproarily disable the GIL for parallel processing in numba and cython.

In [4]:
from multiprocessing import (Pool, Process, Queue, Lock,
                             cpu_count,  current_process)
In [5]:
cpu_count()
Out[5]:
8

Embarrasingly parallel

In [6]:
from functools import partial
In [7]:
lock = Lock()

def f(n, lock=lock):
    worker = current_process()
    time.sleep(n)
    with lock:
        print("Worker %d slept for %.2f seconds!"% (worker._identity[0], n))
In [8]:
sleep_times = [0.1, 0.5, 5, 5, 0.1, 0.1, 1, 1, 5, 1, 1]
In [9]:
with Pool(processes=4) as p:
    p.map(f, sleep_times)
Worker 1 slept for 0.10 seconds!
Worker 1 slept for 0.10 seconds!
Worker 1 slept for 0.10 seconds!
Worker 2 slept for 0.50 seconds!
Worker 1 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 2 slept for 1.00 seconds!
Worker 3 slept for 5.00 seconds!
Worker 4 slept for 5.00 seconds!
Worker 1 slept for 5.00 seconds!

Consumer-Producer

Note that we assign each process with its own random number generator. This is crude, and does not guarantee that the radnom numbers generated are independent. If you need to genrate random numbers in parallel, consider using the `radnomgen <https://bashtage.github.io/randomgen/#>`__ package and the strategies for parallel random number generation.

In [10]:
def producer(pid, q, lock):
    re = np.random.RandomState(pid)
    for i in range(3):
        time.sleep(re.uniform(0, 3))
        n = re.randint(0, 100)
        with lock:
            print("Producer %d put %d in queue!" % (pid, n))
        q.put(n)

def consumer(pid, q, lock):
    re = np.random.RandomState(pid)
    while True:
        n = q.get()
        time.sleep(re.uniform(0, 1))
        with lock:
            print("Conusmer %d got %d" % (pid, n))

q = Queue()
lock = Lock()

producers = []
for i in range(4):
    p = Process(target=producer, args=(i, q, lock))
    producers.append(p)

consumers = []
for i in range(2):
    c = Process(target=consumer, args=(i, q, lock))
    c.daemon = True
    consumers.append(c)

for p in producers:
    p.start()

for c in consumers:
    c.start()

# Optional syncrhonization step
for p in  producers:
    p.join()
Producer 1 put 12 in queue!
Producer 2 put 72 in queue!
Producer 0 put 64 in queue!
Producer 3 put 3 in queue!
Conusmer 1 got 72
Conusmer 0 got 12
Conusmer 1 got 64
Conusmer 0 got 3
Producer 3 put 0 in queue!
Conusmer 1 got 0
Producer 2 put 82 in queue!
Producer 0 put 67 in queue!
Conusmer 0 got 82
Conusmer 1 got 67
Producer 1 put 9 in queue!
Producer 3 put 19 in queue!
Conusmer 1 got 19
Producer 2 put 7 in queue!
Conusmer 1 got 7
Conusmer 0 got 9
Producer 1 put 79 in queue!
Conusmer 1 got 79
Producer 0 put 83 in queue!
Conusmer 0 got 83