{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Streaming and functional programming" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# standard libraries\n", "import math\n", "import os\n", "import gzip\n", "from glob import glob\n", "import itertools as it\n", "\n", "# 3rd party libraries\n", "import toolz as tz\n", "import toolz.curried as c\n", "import numpy as np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Understanding `toolz`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `concat`" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.concat((range(3), 'abc', it.count(5,2)))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Implementation with regular Python" ] }, { "cell_type": "code", "execution_count": 55, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def concat(args):\n", " for arg in args:\n", " yield from arg" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, concat((range(3), 'abc', it.count(5,2)))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `frequencies`" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'GCTGCTCTATGCCGGCGACTGTTCCGGTGGCTGGTGTCGGTCTGGTGATTTTCGCCTTCCTTTTTGCATCTCTCTCGTTGTCGTTGCGATGCCCATTCTTACGAGACTGGCATTGAAACGTGCTGTCACTTGGGGCCGTGGTGTCCTGTCTTTGCGGTGCTTTGGGCGGGAGGGGGCTGCTGTATGGGCTGTGGGGCTGAGCAGTACGGTGTCTCTCCGGGCATTCCAGGGGTCTCCTTAGGTCGATACCCCTCCATGTTGAATTGGCGTGACCAGGGCATGTGACCTCGAGGGGGGCCCTGCTACGCGGGTTCGATCAGTGTGTAGCGGGCTGCAGTTGCTATGTCGGTTTCCGAGTGGGGGTCGCTGGGGCTAGCCTTCAGGCGCTGTTTCGGGGCGTGTGTAACTTTCAAACCTCTGGGCGTGTTCCCGACTGTTTTTGTTTCTCCTGTTTATCTGGAAGAGTTGGTCGTTTGAGGGTCAAGCAAGTCGCCGTTTTTTTGTCATTTGGCTGCCGTTTGTAGAGCCAAGTTGGTGGTGATTTCTTCGGGTAGCTGGCCTTGTGCTGAGTGTGGGATCTCTAGTCTAATGCCTGGTTCTGGGTCGTTAGGGTACGTACCTGGCGCGCCAACGTTACGACTGAGGCCGTGCTTTCTCGTGGGGCCGGGGGGACCTCGGCGGGTCGCGCCTTTTGGTGAGGCGGATACCTGGCTCGCTCGGGACCTGAGTTCATTTGTTGGGAGCGGCCCGTCAACGCGTGCCTGATGGGGATTGCTCCGGGGCTCGGTGGTGTTGTTTGGCTTTTACCGCAAGGTGGGTCGTTGTCGCGGGTTGGGGGGGATTTGGCTTTGCGTGTCTCGTCGTTCACTGTGGTCTTAGTTCTCGTTGACTGCAAGAGGAGTGGTTTCTCGGCGTTTTGTGTGTCTAGTACTCGGGCTGTAGCGTCGGGACTGTGGTGGCGCCATCCCAGGTTGACCGCCTGGGTGTGGCCAGCACAGTG'" ] }, "execution_count": 60, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dna = np.random.choice(list('ACTG'), 1000, p=[0.1, 0.2, 0.3, 0.4])\n", "dna = ''.join(dna)\n", "dna" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Raw frequencies" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'A': 109, 'C': 229, 'G': 357, 'T': 305}" ] }, "execution_count": 61, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tz.frequencies(dna)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Implementation with regular Python" ] }, { "cell_type": "code", "execution_count": 65, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def frequencies(seq):\n", " d = {}\n", " for x in seq:\n", " d[x] = d.get(x, 0) + 1\n", " return d" ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'A': 109, 'C': 229, 'G': 357, 'T': 305}" ] }, "execution_count": 66, "metadata": {}, "output_type": "execute_result" } ], "source": [ "frequencies(dna)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `sliding_window`" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('G', 'C'),\n", " ('C', 'T'),\n", " ('T', 'G'),\n", " ('G', 'C'),\n", " ('C', 'T'),\n", " ('T', 'C'),\n", " ('C', 'T'),\n", " ('T', 'A'),\n", " ('A', 'T'),\n", " ('T', 'G')]" ] }, "execution_count": 69, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.sliding_window(2, dna)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Implementation with regular Python" ] }, { "cell_type": "code", "execution_count": 70, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def sliding_window(n, seq):\n", " tuples = (it.islice(s, i, None) for i, s in enumerate(it.tee(seq, n)))\n", " yield from zip(*tuples)" ] }, { "cell_type": "code", "execution_count": 71, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('G', 'C'),\n", " ('C', 'T'),\n", " ('T', 'G'),\n", " ('G', 'C'),\n", " ('C', 'T'),\n", " ('T', 'C'),\n", " ('C', 'T'),\n", " ('T', 'A'),\n", " ('A', 'T'),\n", " ('T', 'G')]" ] }, "execution_count": 71, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, sliding_window(2, dna)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `partition`" ] }, { "cell_type": "code", "execution_count": 76, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('G', 'C', 'T', 'G', 'C'),\n", " ('T', 'C', 'T', 'A', 'T'),\n", " ('G', 'C', 'C', 'G', 'G'),\n", " ('C', 'G', 'A', 'C', 'T'),\n", " ('G', 'T', 'T', 'C', 'C'),\n", " ('G', 'G', 'T', 'G', 'G'),\n", " ('C', 'T', 'G', 'G', 'T'),\n", " ('G', 'T', 'C', 'G', 'G'),\n", " ('T', 'C', 'T', 'G', 'G'),\n", " ('T', 'G', 'A', 'T', 'T')]" ] }, "execution_count": 76, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.partition(5, dna)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Implementation with regular Python" ] }, { "cell_type": "code", "execution_count": 85, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def partition(n, seq):\n", " xs = iter(seq)\n", " item = tuple(it.islice(xs, n))\n", " while len(item) == n:\n", " yield item\n", " item = tuple(it.islice(xs, n))" ] }, { "cell_type": "code", "execution_count": 86, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('G', 'C', 'T', 'G', 'C'),\n", " ('T', 'C', 'T', 'A', 'T'),\n", " ('G', 'C', 'C', 'G', 'G'),\n", " ('C', 'G', 'A', 'C', 'T'),\n", " ('G', 'T', 'T', 'C', 'C'),\n", " ('G', 'G', 'T', 'G', 'G'),\n", " ('C', 'T', 'G', 'G', 'T'),\n", " ('G', 'T', 'C', 'G', 'G'),\n", " ('T', 'C', 'T', 'G', 'G'),\n", " ('T', 'G', 'A', 'T', 'T')]" ] }, "execution_count": 86, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, partition(5, dna)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `partition_all`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Regular `partiiton` ignores incomplete partitions." ] }, { "cell_type": "code", "execution_count": 92, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9)]" ] }, "execution_count": 92, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.partition(5, range(13))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`partition_all` includes incomplete partitions." ] }, { "cell_type": "code", "execution_count": 89, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]" ] }, "execution_count": 89, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.partition_all(5, range(13))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Challenge**: The version of `partition` in `toolz` has an optional `pad` argument. Implement this." ] }, { "cell_type": "code", "execution_count": 95, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 'X', 'X')]" ] }, "execution_count": 95, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, tz.partition(5, range(13), pad='X')))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Implementation with regular Python" ] }, { "cell_type": "code", "execution_count": 90, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def partition_all(n, seq):\n", " xs = iter(seq)\n", " item = tuple(it.islice(xs, n))\n", " while item:\n", " yield item\n", " item = tuple(it.islice(xs, n))" ] }, { "cell_type": "code", "execution_count": 91, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]" ] }, "execution_count": 91, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, partition_all(5, range(13))))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `curry`" ] }, { "cell_type": "code", "execution_count": 96, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def f(a, b, c):\n", " return a, b, c" ] }, { "cell_type": "code", "execution_count": 102, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(1, 2, 3)" ] }, "execution_count": 102, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f1 = tz.curry(f)\n", "f1(1)(2)(3)" ] }, { "cell_type": "code", "execution_count": 103, "metadata": { "collapsed": true }, "outputs": [], "source": [ "f2 = tz.curry(f, 1, 2)" ] }, { "cell_type": "code", "execution_count": 104, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(1, 2, 3)" ] }, "execution_count": 104, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f2(3)" ] }, { "cell_type": "code", "execution_count": 109, "metadata": { "collapsed": true }, "outputs": [], "source": [ "f3 = tz.curry(f, c=3)" ] }, { "cell_type": "code", "execution_count": 110, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(1, 2, 3)" ] }, "execution_count": 110, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f3(1, 2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `toolz` package also provides \"curried\" versions of its functions in `toolz.curried` (which we have imported as `c`). " ] }, { "cell_type": "code", "execution_count": 112, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]" ] }, "execution_count": 112, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f = c.take(10)\n", "list(f(it.count()))" ] }, { "cell_type": "code", "execution_count": 113, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]" ] }, "execution_count": 113, "metadata": {}, "output_type": "execute_result" } ], "source": [ "g = c.map(lambda x: x**2)\n", "list(f(g(it.count())))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `pipe`\n", "\n", "Like the pipe functionality in `R` and `bash`, you can pipe data though through a series of functions. This can make functional programs much more readable." ] }, { "cell_type": "code", "execution_count": 114, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "285" ] }, "execution_count": 114, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tz.pipe(\n", " it.count(),\n", " c.map(lambda x: x**2),\n", " c.take(10),\n", " sum\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**1**. Rewrite this monstrosity using `pipe` and `curry`.\n", "```python\n", "tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))\n", "```" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "229764" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "229764" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "tz.pipe(\n", " it.count(),\n", " c.map(lambda x: x**2),\n", " c.take(20),\n", " c.filter(lambda x: x % 2 == 0),\n", " c.drop(5),\n", " c.reduce(lambda x, y: x + y**2)\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**2**. How could you code this using only the Python standard library?" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "239664" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "xs = it.count()\n", "xs = (x**2 for x in xs)\n", "xs = it.islice(xs, 20)\n", "xs = (x for x in xs if x % 2 == 0)\n", "xs = it.islice(xs, 5, None)\n", "xs = sum(x**2 for x in xs)\n", "xs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**3**. Write a function to calculate the running mean of a possibly infinite stream of numbers." ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def rmean(seq):\n", " s = 0\n", " n = 0\n", " for x in seq:\n", " s += x\n", " n += 1\n", " yield s/n" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5]" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, rmean(it.count())))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**4**. Write a function for running mean to update in mini-batches of size `n`" ] }, { "cell_type": "code", "execution_count": 42, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def rmean_mb(size, seq):\n", " s = 0\n", " n = 0\n", " for x in tz.partition_all(size, seq):\n", " s += sum(x)\n", " n += len(x)\n", " yield s/n" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0, 11.5, 13.0, 14.5]" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "list(tz.take(10, rmean_mb(3, it.count())))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**5**. Count the frequencies of 2-tuples formed by reading from all files with name `numbers???.txt` one line at a time, ignoring comment lines, and including tuples that wrap around the end of the line. The `numbers???.txt` files can be generated using the code below." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "num_files = 3\n", "num_lines = 100000\n", "for i in range(num_files):\n", " with open('numbers%03d.txt' % i, 'w') as f:\n", " for j in range(num_lines):\n", " if np.random.rand() < 0.3:\n", " items = '#'\n", " else:\n", " items = np.random.randint(0, 5, 10)\n", " f.write('\\t'.join(map(str, items)) + '\\n')" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "def freqs(paths):\n", " ans = tz.pipe(\n", " paths,\n", " glob,\n", " c.map(open),\n", " tz.concat,\n", " c.filter(lambda line: not line.startswith('#')),\n", " c.map(str.split),\n", " tz.concat,\n", " c.map(int),\n", " c.sliding_window(2),\n", " tz.frequencies\n", " )\n", " return ans" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{(0, 0): 84268,\n", " (0, 1): 83326,\n", " (0, 2): 84399,\n", " (0, 3): 84153,\n", " (0, 4): 83620,\n", " (1, 0): 83723,\n", " (1, 1): 83756,\n", " (1, 2): 83952,\n", " (1, 3): 83608,\n", " (1, 4): 83269,\n", " (2, 0): 83858,\n", " (2, 1): 84105,\n", " (2, 2): 84428,\n", " (2, 3): 84063,\n", " (2, 4): 83778,\n", " (3, 0): 84404,\n", " (3, 1): 83750,\n", " (3, 2): 84055,\n", " (3, 3): 84015,\n", " (3, 4): 84198,\n", " (4, 0): 83513,\n", " (4, 1): 83370,\n", " (4, 2): 83398,\n", " (4, 3): 84583,\n", " (4, 4): 83507}" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "freqs('numbers???.txt')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**6**. Find line numbers of comments in `fot.txt`." ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing foo.txt\n" ] } ], "source": [ "%%file foo.txt\n", "1,2\n", "3,4\n", "5,6 # first comment\n", "7,8\n", "9,10,\n", "11,12 # second comment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Option 1: Using `pipe`" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [], "source": [ "def find_comment_line_numbers_1(filename):\n", " ans = tz.pipe(\n", " filename,\n", " open,\n", " enumerate,\n", " c.filter(lambda x: '#' in x[1]),\n", " c.map(lambda x: x[0])\n", " )\n", " return ans" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 5]" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ans = find_comment_line_numbers_1('foo.txt')\n", "list(ans)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Option 2: Using regular Python" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [], "source": [ "def find_comment_line_numbers_2(filename):\n", " with open(filename) as f:\n", " for i, line in enumerate(f):\n", " if '#' in line:\n", " yield i" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 5]" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ans = find_comment_line_numbers_2('foo.txt')\n", "list(ans)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.1" } }, "nbformat": 4, "nbformat_minor": 2 }