{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel Programming" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Referneces**\n", "\n", "- [`threading` — Thread-based parallelism](https://docs.python.org/3.6/library/threading.html)\n", "- [`multiprocessing` - Process-based “threading” interface](https://docs.python.org/3.6/library/multiprocessing.html)\n", "- [`multiprocess` - a \"better\" `multiprocessing](https://github.com/uqfoundation/multiprocess)\n", "- [concurrent.futures — Launching parallel tasks](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", "- [Concurrency with Processes, Threads, and Coroutines](https://pymotw.com/3//concurrency.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The goal is to design parallel programs that are flexible, efficient and simple.\n", "\n", "**Step 0**: Start by profiling a serial program to identify bottlenecks\n", "\n", "**Step 1**: Are there for opportunities for parallelism?\n", "\n", "- Can tasks be performed in parallel?\n", " - Function calls\n", " - Loops\n", "- Can data be split and operated on in parallel?\n", " - Decomposition of arrays along rows, columns, blocks\n", " - Decomposition of trees into sub-trees\n", "- Is there a pipeline with a sequence of stages?\n", " - Data preprocessing and analysis\n", " - Graphics rendering\n", "\n", "**Step 2**: What is the nature of the parallelism?\n", "\n", "- Linear\n", " - Embarrassingly parallel programs\n", "- Recursive\n", " - Adaptive partitioning methods\n", "\n", "**Step 3**: What is the granularity?\n", "\n", "- 10s of jobs\n", "- 1000s of jobs\n", "\n", "**Step 4**: Choose an algorithm\n", "\n", "- Organize by tasks\n", " - Task parallelism\n", " - Divide and conquer\n", "\n", "- Organize by data\n", " - Geometric decomposition\n", " - Recursive decomposition\n", "\n", "- Organize by flow\n", " - Pipeline\n", " - Event-based processing\n", "\n", "**Step 5**: Map to program and data structures\n", "\n", "- Program structures\n", " - Single program multiple data (SPMD)\n", " - Master/worker\n", " - Loop parallelism\n", " - Fork/join\n", "- Data structures \n", " - Shared data\n", " - Shared queue\n", " - Distributed array\n", "\n", "**Step 6**: Map to parallel environment\n", "\n", "- Multi-core shared memory\n", " - Cython with OpenMP\n", " - multiprocessing\n", " - IPython.cluster\n", "- Multi-computer\n", " - IPython.cluster\n", " - MPI\n", " - Hadoop / Spark\n", "- GPU\n", " - CUDA\n", " - OpenCL\n", "\n", "**Step 7**: Execute, debug, tune in parallel environment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Embarrassingly parallel programs\n", "\n", "Many statistical problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:\n", "\n", "- Monte Carlo integration\n", "- Multiple chains of MCMC\n", "- Bootstrap for confidence intervals\n", "- Power calculations by simulation\n", "- Permutation-resampling tests \n", "- Fitting same model on multiple data sets\n", "\n", "Other problems are serial at small scale, but can be parallelized at large scales. For example, EM and MCMC iterations are inherently serial since there is a dependence on the previous state, but within a single iteration, there can be many thousands of density calculations (one for each data point to calculate the likelihood), and this is an embarrassingly parallel problem within a single iteration. \n", "\n", "These \"low hanging fruits\" are great because they offer a path to easy parallelism with minimal complexity." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Degree of parallelism\n", "\n", "**The bigger the problem, the more scope there is for parallelism**\n", "\n", "**Amhdahls' law** says that the speedup from parallelization is bounded by the ratio of parallelizable to irreducibly serial code in the algorithm. However, for big data analysis, **Gustafson's Law** is more relevant. This says that we are nearly always interested in increasing the size of the parallelizable bits, and the ratio of parallelizable to irreducibly serial code is not a static quantity but depends on data size. For example, Gibbs sampling has an irreducibly serial nature, but for large samples, each iteration may be able perform PDF evaluations in parallel for zillions of data points." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from numba import jit, prange" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Task parallelism\n", "\n", "There are many parallel programming idioms or patterns, and this can get quite confusing. One simple way to design a parallel soluiton is to ask if there is data and/or task parallelism. \n", "\n", "- Data parallelism means that the data is distributed across processes (e.g. MPI, Hadoop, Spark)\n", "- Task parallelism means that tasks (functiosn) are distributed across processes, and differnet units of work (data) are sent to each task (e.g. multithreading, multiprocessing, signnle GPU programming). Note that is posisble and common for the same task (function) to be distributed to mulitpele processes.\n", "\n", "The library `multiprocessing` handles task parallelism using processes. Unlike threads which share the same memory space, each process has its own memory space. Python does not allow multiple threads to run at the same time (see Global Interpretr Lock or [GIL](http://www.dabeaz.com/python/UnderstandingGIL.pdf)), and multi-threading is achieved by time slicing. Threads are useful for tasks dominated by latency (wiaitng for network to respond, I/O) but will not show a speed-up on computationally intesnive tasks due to the GIL. Hence, computationally intensive tasks need to be run as processes to see speedups. Later we will see how we can temproarily disable the GIL for parallel processing in `numba` and `cython`." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from multiprocessing import (Pool, Process, Queue, Lock, \n", " cpu_count, current_process)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "8" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cpu_count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Embarrasingly parallel" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from functools import partial" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "lock = Lock()\n", "\n", "def f(n, lock=lock):\n", " worker = current_process()\n", " time.sleep(n)\n", " with lock:\n", " print(\"Worker %d slept for %.2f seconds!\"% (worker._identity[0], n))" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "sleep_times = [0.1, 0.5, 5, 5, 0.1, 0.1, 1, 1, 5, 1, 1]" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Worker 1 slept for 0.10 seconds!\n", "Worker 1 slept for 0.10 seconds!\n", "Worker 1 slept for 0.10 seconds!\n", "Worker 2 slept for 0.50 seconds!\n", "Worker 1 slept for 1.00 seconds!\n", "Worker 2 slept for 1.00 seconds!\n", "Worker 2 slept for 1.00 seconds!\n", "Worker 2 slept for 1.00 seconds!\n", "Worker 3 slept for 5.00 seconds!\n", "Worker 4 slept for 5.00 seconds!\n", "Worker 1 slept for 5.00 seconds!\n" ] } ], "source": [ "with Pool(processes=4) as p:\n", " p.map(f, sleep_times)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Consumer-Producer\n", "\n", "Note that we assign each process with its own random number generator. This is crude, and does not guarantee that the radnom numbers generated are independent. If you need to genrate random numbers in parallel, consider using the [`radnomgen`](https://bashtage.github.io/randomgen/#) package and the [strategies for parallel random number generation](https://bashtage.github.io/randomgen/parallel.html)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Producer 1 put 12 in queue!\n", "Producer 2 put 72 in queue!\n", "Producer 0 put 64 in queue!\n", "Producer 3 put 3 in queue!\n", "Conusmer 1 got 72\n", "Conusmer 0 got 12\n", "Conusmer 1 got 64\n", "Conusmer 0 got 3\n", "Producer 3 put 0 in queue!\n", "Conusmer 1 got 0\n", "Producer 2 put 82 in queue!\n", "Producer 0 put 67 in queue!\n", "Conusmer 0 got 82\n", "Conusmer 1 got 67\n", "Producer 1 put 9 in queue!\n", "Producer 3 put 19 in queue!\n", "Conusmer 1 got 19\n", "Producer 2 put 7 in queue!\n", "Conusmer 1 got 7\n", "Conusmer 0 got 9\n", "Producer 1 put 79 in queue!\n", "Conusmer 1 got 79\n", "Producer 0 put 83 in queue!\n", "Conusmer 0 got 83\n" ] } ], "source": [ "def producer(pid, q, lock):\n", " re = np.random.RandomState(pid)\n", " for i in range(3):\n", " time.sleep(re.uniform(0, 3))\n", " n = re.randint(0, 100)\n", " with lock:\n", " print(\"Producer %d put %d in queue!\" % (pid, n))\n", " q.put(n)\n", "\n", "def consumer(pid, q, lock):\n", " re = np.random.RandomState(pid)\n", " while True:\n", " n = q.get()\n", " time.sleep(re.uniform(0, 1)) \n", " with lock:\n", " print(\"Conusmer %d got %d\" % (pid, n))\n", " \n", "q = Queue()\n", "lock = Lock()\n", "\n", "producers = []\n", "for i in range(4):\n", " p = Process(target=producer, args=(i, q, lock))\n", " producers.append(p)\n", " \n", "consumers = []\n", "for i in range(2):\n", " c = Process(target=consumer, args=(i, q, lock))\n", " c.daemon = True\n", " consumers.append(c)\n", "\n", "for p in producers:\n", " p.start()\n", " \n", "for c in consumers:\n", " c.start()\n", "\n", "# Optional syncrhonization step\n", "for p in producers:\n", " p.join()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" } }, "nbformat": 4, "nbformat_minor": 2 }