{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Multi-Core Parallelism\n", "====" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%load_ext cython" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import os" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\n", "import multiprocessing as mp\n", "from multiprocessing import Pool, Value, Array\n", "import time\n", "from numba import njit" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Vanilla Python\n", "----" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def mc_pi(n):\n", " s = 0\n", " for i in range(n):\n", " x = np.random.uniform(-1, 1)\n", " y = np.random.uniform(-1, 1)\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 2.12 s, sys: 4 ms, total: 2.13 s\n", "Wall time: 2.14 s\n" ] } ], "source": [ "%%time\n", "\n", "res = [mc_pi(int(1e5)) for i in range(10)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `numba` to speed up computation\n", "----" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "@njit()\n", "def mc_pi_numba(n):\n", " s = 0\n", " for i in range(n):\n", " x = np.random.uniform(-1, 1)\n", " y = np.random.uniform(-1, 1)\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.63 s, sys: 20 ms, total: 1.65 s\n", "Wall time: 1.65 s\n" ] } ], "source": [ "%%time\n", "\n", "res = [mc_pi_numba(int(1e7)) for i in range(10)]" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1408772, 3.1410464, 3.1416584, 3.141608 , 3.1412532,\n", " 3.141702 , 3.141554 , 3.1422448, 3.1413172, 3.1411456])" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using `cython` to speed up computation" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "scrolled": false }, "outputs": [], "source": [ "%%cython\n", "\n", "import cython\n", "\n", "from libc.stdlib cimport rand\n", "cdef extern from \"limits.h\":\n", " int INT_MAX\n", "\n", "@cython.cdivision(True) \n", "def mc_pi_cython(int n):\n", " cdef double s = 0.0\n", " cdef double x, y\n", " cdef int i\n", "\n", " for i in range(n):\n", " x = 2*(rand()/float(INT_MAX)) - 1\n", " y = 2*(rand()/float(INT_MAX)) - 1\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4.2 s, sys: 16 ms, total: 4.22 s\n", "Wall time: 4.21 s\n" ] } ], "source": [ "%%time\n", "\n", "res = [mc_pi_cython(int(1e7)) for i in range(10)]" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1422556, 3.1427648, 3.142728 , 3.141858 , 3.1406612,\n", " 3.141878 , 3.1413508, 3.1413644, 3.1408592, 3.1417288])" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `concurrent.futures` module\n", "----\n", "\n", "Concurrent processes are processes that will return the same results regardless of the order in which they were executed. A \"future\" is something that will return a result sometime in the future. The `concurrent.futures` module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.\n", "\n", "While concurrent futures provide a simpler interface, it is slower and less flexible when compared with using `multiprocessing` for parallel execution." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using processes in parallel with `ProcessPoolExecutor`\n", "----\n", "\n", "We get a linear speedup as expected." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 16 ms, sys: 20 ms, total: 36 ms\n", "Wall time: 1.5 s\n" ] } ], "source": [ "%%time\n", "\n", "with ProcessPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1412484, 3.1412484, 3.1412484, 3.1412484, 3.1412768,\n", " 3.1412768, 3.1412768, 3.1412768, 3.1412796, 3.1412796])" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(list(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### When you have many jobs\n", "\n", "The `futures` object gives fine control over the process, such as adding callbacks and canceling a submitted job, but is computationally expensive. We can use the `chunksize` argument to reduce this cost when submitting many jobs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Using default chunksize of 1 for 10000 jobs\n", "\n", "The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 3.02 s, sys: 2.59 s, total: 5.61 s\n", "Wall time: 4.2 s\n" ] } ], "source": [ "%%time\n", "\n", "with ProcessPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Using chunksize of 100" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 48 ms, sys: 60 ms, total: 108 ms\n", "Wall time: 1.32 s\n" ] } ], "source": [ "%%time\n", "\n", "with ProcessPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))], chunksize=100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fine control of processes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Status of processes" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "def f1(x):\n", " return x**2\n", "\n", "def f2(x, y):\n", " return x*y" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "a running: True\n", "a done: False\n", "b running: True\n", "b done: False\n", "c running: True\n", "c done: False\n", "a result 1\n", "b result 2\n", "c result 100\n" ] } ], "source": [ "with ProcessPoolExecutor(max_workers=4) as pool:\n", " a = pool.submit(f2, 1, 1)\n", " b = pool.submit(f2, 1,2)\n", " c = pool.submit(f1, 10) \n", "\n", " print('a running:', a.running())\n", " print('a done:', a.done())\n", "\n", " print('b running:', b.running())\n", " print('b done:', b.done())\n", "\n", " print('c running:', c.running())\n", " print('c done:', c.done())\n", "\n", " print('a result', a.result())\n", " print('b result', b.result())\n", " print('c result', c.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Canceling jobs and adding callbacks" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Process done!\n", "0.647142783744\n", "0.0165398742026\n", "Running\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "Process done!\n", "0.0678962539814\n", "0.285092840593\n", "0.00702848555339\n", "0.147327841571\n", "0.0804591350953\n", "0.00422308306556\n", "0.224027957196\n", "0.219935389538\n", "0.390023881571\n", "0.00762518296509\n", "0.174535563025\n", "0.202128563571\n", "0.134040028724\n", "0.780790978879\n", "0.0476424379436\n", "0.946881166408\n", "0.270935375167\n", "0.0154612806871\n", "0.0158441978589\n", "0.0907343392355\n", "0.0266080145638\n", "0.0754518489776\n" ] } ], "source": [ "njobs = 24\n", "\n", "res = []\n", "\n", "with ProcessPoolExecutor(max_workers=4) as pool:\n", "\n", " for i in range(njobs):\n", " res.append(pool.submit(f2, *np.random.rand(2)))\n", " if i % 2 == 0:\n", " res[i].add_done_callback(lambda future: print(\"Process done!\"))\n", " res[4].cancel()\n", " if res[4].cancelled():\n", " print(\"Process 4 cancelled\")\n", "\n", " for i, x in enumerate(res):\n", " while x.running():\n", " print(\"Running\")\n", " time.sleep(1)\n", " if not x.cancelled():\n", " print(x.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Functions with multiple arguments" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "def f(a, b):\n", " return a + b" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Using a function adapter" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "def f_(args):\n", " return f(*args)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "xs = np.arange(24)\n", "chunks = np.array_split(xs, xs.shape[0]//2)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[array([0, 1]),\n", " array([2, 3]),\n", " array([4, 5]),\n", " array([6, 7]),\n", " array([8, 9]),\n", " array([10, 11]),\n", " array([12, 13]),\n", " array([14, 15]),\n", " array([16, 17]),\n", " array([18, 19]),\n", " array([20, 21]),\n", " array([22, 23])]" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chunks" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "with ProcessPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(f_, chunks)\n", "list(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using processes in parallel with ThreadPoolExecutor\n", "----\n", "\n", "We do not get any speedup because the GIL only allows one thread to run at one time." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4.34 s, sys: 32 ms, total: 4.38 s\n", "Wall time: 4.32 s\n" ] } ], "source": [ "%%time\n", "\n", "with ThreadPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1412484, 3.1412768, 3.1412796, 3.1421904, 3.1422264,\n", " 3.1411704, 3.1415984, 3.1423524, 3.1411896, 3.1417464])" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(list(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Turning off the GIL in `cython`\n", "----" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "%%cython\n", "\n", "import cython\n", "\n", "from libc.stdlib cimport rand\n", "cdef extern from \"limits.h\":\n", " int INT_MAX\n", "\n", "@cython.cdivision(True) \n", "def mc_pi_cython_nogil(int n):\n", " cdef double s = 0.0\n", " cdef double x, y\n", " cdef int i\n", "\n", " with cython.nogil:\n", " for i in range(n):\n", " x = 2*(rand()/float(INT_MAX)) - 1\n", " y = 2*(rand()/float(INT_MAX)) - 1\n", " if (x**2 + y**2) < 1:\n", " s += 1\n", " return 4*s/n" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4.08 s, sys: 0 ns, total: 4.08 s\n", "Wall time: 4.08 s\n" ] } ], "source": [ "%%time\n", "\n", "res = [mc_pi_cython_nogil(int(1e7)) for i in range(10)]" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.141222 , 3.1419168, 3.1416088, 3.1414532, 3.142072 ,\n", " 3.141182 , 3.142152 , 3.1412056, 3.142742 , 3.141344 ])" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using processes in parallel with `ThreadPoolExecutor` and `nogil`\n", "----\n", "\n", "We finally get the linear speedup expected. Note that threads are actually faster than processes because there is less overhead to using a thread." ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 9.65 s, sys: 35.2 s, total: 44.8 s\n", "Wall time: 13.4 s\n" ] } ], "source": [ "%%time\n", "\n", "with ThreadPoolExecutor(max_workers=4) as pool:\n", " res = pool.map(mc_pi_cython_nogil, [int(1e7) for i in range(10)])" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1419416, 3.1409748, 3.141634 , 3.1413224, 3.1419116,\n", " 3.1409356, 3.14161 , 3.1404276, 3.142516 , 3.1415676])" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(list(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `multiprocessing`\n", "----\n", "\n", "One nice thing about using `multiprocessing` is that it works equally well for small numbers of large jobs, or large numbers of small jobs out of the box." ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8 ms, sys: 32 ms, total: 40 ms\n", "Wall time: 1.44 s\n" ] } ], "source": [ "%%time\n", "\n", "with mp.Pool(processes=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e7) for i in range(10)])" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1412268, 3.1412268, 3.1412268, 3.1412268, 3.1400984,\n", " 3.1400984, 3.1400984, 3.1400984, 3.1408268, 3.1408268])" ] }, "execution_count": 42, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(res)" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 16 ms, sys: 28 ms, total: 44 ms\n", "Wall time: 1.24 s\n" ] } ], "source": [ "%%time\n", "\n", "with mp.Pool(processes=4) as pool:\n", " res = pool.map(mc_pi_cython, [int(1e4) for i in range(int(1e4))])" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 3.1288, 3.1468, 3.1356, ..., 3.1228, 3.1272, 3.1036])" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.array(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating individual processes" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [], "source": [ "def f(i):\n", " time.sleep(np.random.random())\n", " print(os.getpid(), i)" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2248 0\n", "2251 1\n", "2254 2\n", "2257 3\n", "2260 4\n", "2263 5\n", "2266 6\n", "2269 7\n", "2272 8\n", "2275 9\n" ] } ], "source": [ "for i in range(10):\n", " p = mp.Process(target=f, args=(i,))\n", " p.start()\n", " p.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Functions with multiple arguments\n", "\n", "Multiprocessing `Pool` has a `starmap` method that removes the need to write a wrapper function." ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [], "source": [ "def f(a, b):\n", " return a + b" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]" ] }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "xs = np.arange(24)\n", "with Pool(processes=4) as pool:\n", " res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))\n", "list(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Partial application\n", "\n", "Sometimes, `functools.partial` can be used to reduce the number of arguments needed to just one." ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [], "source": [ "def f(a, b):\n", " return a * b" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [], "source": [ "from functools import partial\n", "\n", "fp = partial(f, b=2)" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "array([ 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,\n", " 34, 36, 38, 40, 42, 44, 46])" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "xs = np.arange(24)\n", "with Pool(processes=4) as pool:\n", " res = pool.map(fp, xs)\n", "np.array(list(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### How do we get a return value from a process?" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [], "source": [ "def f1(q, i):\n", " time.sleep(np.random.random())\n", " q.put((os.getpid(), i))" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(2292, 0),\n", " (2294, 1),\n", " (2296, 2),\n", " (2298, 3),\n", " (2300, 4),\n", " (2302, 5),\n", " (2304, 6),\n", " (2306, 7),\n", " (2308, 8),\n", " (2310, 9)]" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "q = mp.Queue()\n", "\n", "res = []\n", "for i in range(10):\n", " p = mp.Process(target=f1, args=(q,i,))\n", " p.start()\n", " res.append(q.get())\n", " p.join()\n", "\n", "res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Counting number of jobs (1)" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "def f2(i):\n", " global counter\n", " counter = counter + 1\n", " print(os.getpid(), i)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Checking" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1997 10\n", "1\n" ] } ], "source": [ "counter = 0\n", "f2(10)\n", "print(counter)" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2312 0\n", "2315 1\n", "2318 2\n", "2321 3\n", "2324 4\n", "2327 5\n", "2330 6\n", "2333 7\n", "2336 8\n", "2339 9\n" ] } ], "source": [ "counter = 0\n", "\n", "for i in range(10):\n", " p = mp.Process(target=f2, args=(i,))\n", " p.start()\n", " p.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Note that separate processes have their own memory and DO NOT share global memory" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0" ] }, "execution_count": 60, "metadata": {}, "output_type": "execute_result" } ], "source": [ "counter" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Counting number of jobs (2)\n", "\n", "We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient)." ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [], "source": [ "def f3(i, counter, store):\n", " counter.value += 1\n", " store[os.getpid() % 10] += i" ] }, { "cell_type": "code", "execution_count": 62, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "100\n", "[530, 540, 450, 460, 470, 480, 490, 500, 510, 520]\n", "CPU times: user 144 ms, sys: 316 ms, total: 460 ms\n", "Wall time: 792 ms\n" ] } ], "source": [ "%%time\n", "\n", "counter = mp.Value('i', 0)\n", "store = mp.Array('i', [0]*10)\n", "\n", "for i in range(int(1e2)):\n", " p = mp.Process(target=f3, args=(i, counter, store))\n", " p.start()\n", " p.join()\n", "\n", "print(counter.value)\n", "print(store[:])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Counting number of jobs (3)\n", "\n", "We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the `map-reduce` pattern. We should also use Pool to reuse processes rather than spawn too many of them. We will see much more of the `map-reduc` approach when we work with Spark." ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [], "source": [ "def f4(i):\n", " return (os.getpid(), 1, i)" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "100\n", "[ 0. 0. 1101. 369. 699. 237. 894. 723. 369. 558.]\n", "CPU times: user 24 ms, sys: 44 ms, total: 68 ms\n", "Wall time: 179 ms\n" ] } ], "source": [ "%%time\n", "\n", "# map step\n", "with mp.Pool(processes=10) as pool:\n", " res = pool.map(f4, range(int(1e2)))\n", "\n", "#reeduce step\n", "res = np.array(res)\n", "\n", "counter = res[:, 1].sum()\n", "print(counter)\n", "\n", "store = np.zeros(10)\n", "idx = res[:, 0] % 10\n", "for i in range(10):\n", " store[i] = res[idx==i, 2].sum()\n", "\n", "print(store)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Common issues with use of shared memory in parallel programs\n", "----\n", "\n", "Writing to shared memory requires careful coordination of processes, and many control and communication concepts are implemented in the multiprocessing library for this purpose, including semaphores, locks, barriers etc. We will not cover these concepts due to their complexity, choosing instead to decouple processes (leading to embarrassingly parallel problems) by making redundant copies of resources if necessary and reducing at a later stage if necessary. Most problems in statistical data analysis can be solved using this simple approach." ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "### Race conditions" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "In the example below, up to 4 processes may be trying to increment and assign a new value to val at the same time. Because this takes two steps (increment the RHS, assign to LHS), it can happen that two or more processes increment at the same time, but this is only assigned and counted once." ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "562\n", "675\n", "418\n" ] } ], "source": [ "def count1(i):\n", " val.value += 1\n", " \n", "for run in range(3):\n", " val = Value('i', 0)\n", " with Pool(processes=4) as pool:\n", " pool.map(count1, range(1000))\n", "\n", " print(val.value)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is usually easier and faster to make copies of resources for each process so that no sharing is required." ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[378, 307, 63, 252] 1000\n", "[252, 315, 126, 307] 1000\n", "[307, 63, 504, 126] 1000\n" ] } ], "source": [ "def count2(i):\n", " ix = os.getpid() % 4\n", " arr[ix] += 1\n", " \n", "for run in range(3):\n", " arr = Array('i', [0]*4)\n", "\n", " with Pool(processes=4) as pool:\n", " pool.map(count2, range(1000))\n", "\n", " print(arr[:], np.sum(arr))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "### Deadlock\n", "\n", "Suppose there are two processes P1 and P2 and two resources A and B. Suppose P1 has a `lock` on A and will only release A after it gains B, while P2 has a `lock` on B and will only release the lock after it gains A. The two processes are doomed to wait forever; this is known as a deadlock and can occur when concurrent processes compete to have exclusive access to the same shared resources. A classic model of deadlock is the [Dining Philosophers Problem](dining philosophers solution). \n", "\n", "We will not show any examples since it will simply freeze the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.3" } }, "nbformat": 4, "nbformat_minor": 1 }