Streaming and functional programming

In [2]:
# standard libraries
import math
import os
import gzip
from glob import glob
import itertools as it

# 3rd party libraries
import toolz as tz
import toolz.curried as c
import numpy as np

Understanding toolz

concat

In [54]:
list(tz.take(10, tz.concat((range(3), 'abc', it.count(5,2)))))
Out[54]:
[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]

Implementation with regular Python

In [55]:
def concat(args):
    for arg in args:
        yield from arg
In [56]:
list(tz.take(10, concat((range(3), 'abc', it.count(5,2)))))
Out[56]:
[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]

frequencies

In [60]:
dna = np.random.choice(list('ACTG'), 1000, p=[0.1, 0.2, 0.3, 0.4])
dna = ''.join(dna)
dna
Out[60]:
'GCTGCTCTATGCCGGCGACTGTTCCGGTGGCTGGTGTCGGTCTGGTGATTTTCGCCTTCCTTTTTGCATCTCTCTCGTTGTCGTTGCGATGCCCATTCTTACGAGACTGGCATTGAAACGTGCTGTCACTTGGGGCCGTGGTGTCCTGTCTTTGCGGTGCTTTGGGCGGGAGGGGGCTGCTGTATGGGCTGTGGGGCTGAGCAGTACGGTGTCTCTCCGGGCATTCCAGGGGTCTCCTTAGGTCGATACCCCTCCATGTTGAATTGGCGTGACCAGGGCATGTGACCTCGAGGGGGGCCCTGCTACGCGGGTTCGATCAGTGTGTAGCGGGCTGCAGTTGCTATGTCGGTTTCCGAGTGGGGGTCGCTGGGGCTAGCCTTCAGGCGCTGTTTCGGGGCGTGTGTAACTTTCAAACCTCTGGGCGTGTTCCCGACTGTTTTTGTTTCTCCTGTTTATCTGGAAGAGTTGGTCGTTTGAGGGTCAAGCAAGTCGCCGTTTTTTTGTCATTTGGCTGCCGTTTGTAGAGCCAAGTTGGTGGTGATTTCTTCGGGTAGCTGGCCTTGTGCTGAGTGTGGGATCTCTAGTCTAATGCCTGGTTCTGGGTCGTTAGGGTACGTACCTGGCGCGCCAACGTTACGACTGAGGCCGTGCTTTCTCGTGGGGCCGGGGGGACCTCGGCGGGTCGCGCCTTTTGGTGAGGCGGATACCTGGCTCGCTCGGGACCTGAGTTCATTTGTTGGGAGCGGCCCGTCAACGCGTGCCTGATGGGGATTGCTCCGGGGCTCGGTGGTGTTGTTTGGCTTTTACCGCAAGGTGGGTCGTTGTCGCGGGTTGGGGGGGATTTGGCTTTGCGTGTCTCGTCGTTCACTGTGGTCTTAGTTCTCGTTGACTGCAAGAGGAGTGGTTTCTCGGCGTTTTGTGTGTCTAGTACTCGGGCTGTAGCGTCGGGACTGTGGTGGCGCCATCCCAGGTTGACCGCCTGGGTGTGGCCAGCACAGTG'

Raw frequencies

In [61]:
tz.frequencies(dna)
Out[61]:
{'A': 109, 'C': 229, 'G': 357, 'T': 305}

Implementation with regular Python

In [65]:
def frequencies(seq):
    d = {}
    for x in seq:
        d[x] = d.get(x, 0) + 1
    return d
In [66]:
frequencies(dna)
Out[66]:
{'A': 109, 'C': 229, 'G': 357, 'T': 305}

sliding_window

In [69]:
list(tz.take(10, tz.sliding_window(2, dna)))
Out[69]:
[('G', 'C'),
 ('C', 'T'),
 ('T', 'G'),
 ('G', 'C'),
 ('C', 'T'),
 ('T', 'C'),
 ('C', 'T'),
 ('T', 'A'),
 ('A', 'T'),
 ('T', 'G')]

Implementation with regular Python

In [70]:
def sliding_window(n, seq):
    tuples = (it.islice(s, i, None) for i, s in enumerate(it.tee(seq, n)))
    yield from zip(*tuples)
In [71]:
list(tz.take(10, sliding_window(2, dna)))
Out[71]:
[('G', 'C'),
 ('C', 'T'),
 ('T', 'G'),
 ('G', 'C'),
 ('C', 'T'),
 ('T', 'C'),
 ('C', 'T'),
 ('T', 'A'),
 ('A', 'T'),
 ('T', 'G')]

partition

In [76]:
list(tz.take(10, tz.partition(5, dna)))
Out[76]:
[('G', 'C', 'T', 'G', 'C'),
 ('T', 'C', 'T', 'A', 'T'),
 ('G', 'C', 'C', 'G', 'G'),
 ('C', 'G', 'A', 'C', 'T'),
 ('G', 'T', 'T', 'C', 'C'),
 ('G', 'G', 'T', 'G', 'G'),
 ('C', 'T', 'G', 'G', 'T'),
 ('G', 'T', 'C', 'G', 'G'),
 ('T', 'C', 'T', 'G', 'G'),
 ('T', 'G', 'A', 'T', 'T')]

Implementation with regular Python

In [85]:
def partition(n, seq):
    xs = iter(seq)
    item = tuple(it.islice(xs, n))
    while len(item) == n:
        yield item
        item = tuple(it.islice(xs, n))
In [86]:
list(tz.take(10, partition(5, dna)))
Out[86]:
[('G', 'C', 'T', 'G', 'C'),
 ('T', 'C', 'T', 'A', 'T'),
 ('G', 'C', 'C', 'G', 'G'),
 ('C', 'G', 'A', 'C', 'T'),
 ('G', 'T', 'T', 'C', 'C'),
 ('G', 'G', 'T', 'G', 'G'),
 ('C', 'T', 'G', 'G', 'T'),
 ('G', 'T', 'C', 'G', 'G'),
 ('T', 'C', 'T', 'G', 'G'),
 ('T', 'G', 'A', 'T', 'T')]

partition_all

Regular partiiton ignores incomplete partitions.

In [92]:
list(tz.take(10, tz.partition(5, range(13))))
Out[92]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9)]

partition_all includes incomplete partitions.

In [89]:
list(tz.take(10, tz.partition_all(5, range(13))))
Out[89]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]

Challenge: The version of partition in toolz has an optional pad argument. Implement this.

In [95]:
list(tz.take(10, tz.partition(5, range(13), pad='X')))
Out[95]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 'X', 'X')]

Implementation with regular Python

In [90]:
def partition_all(n, seq):
    xs = iter(seq)
    item = tuple(it.islice(xs, n))
    while item:
        yield item
        item = tuple(it.islice(xs, n))
In [91]:
list(tz.take(10, partition_all(5, range(13))))
Out[91]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]

curry

In [96]:
def f(a, b, c):
    return a, b, c
In [102]:
f1 = tz.curry(f)
f1(1)(2)(3)
Out[102]:
(1, 2, 3)
In [103]:
f2 = tz.curry(f, 1, 2)
In [104]:
f2(3)
Out[104]:
(1, 2, 3)
In [109]:
f3 = tz.curry(f, c=3)
In [110]:
f3(1, 2)
Out[110]:
(1, 2, 3)

The toolz package also provides “curried” versions of its functions in toolz.curried (which we have imported as c).

In [112]:
f = c.take(10)
list(f(it.count()))
Out[112]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [113]:
g = c.map(lambda x: x**2)
list(f(g(it.count())))
Out[113]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

pipe

Like the pipe functionality in R and bash, you can pipe data though through a series of functions. This can make functional programs much more readable.

In [114]:
tz.pipe(
    it.count(),
    c.map(lambda x: x**2),
    c.take(10),
    sum
)
Out[114]:
285

Exercises

1. Rewrite this monstrosity using pipe and curry.

tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))
In [5]:
tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))
Out[5]:
229764
In [6]:
tz.pipe(
    it.count(),
    c.map(lambda x: x**2),
    c.take(20),
    c.filter(lambda x: x % 2 == 0),
    c.drop(5),
    c.reduce(lambda x, y: x + y**2)
)
Out[6]:
229764

2. How could you code this using only the Python standard library?

In [7]:
xs = it.count()
xs = (x**2 for x in xs)
xs = it.islice(xs, 20)
xs = (x for x in xs if x % 2 == 0)
xs = it.islice(xs, 5, None)
xs = sum(x**2 for x in xs)
xs
Out[7]:
239664

3. Write a function to calculate the running mean of a possibly infinite stream of numbers.

In [36]:
def rmean(seq):
    s = 0
    n = 0
    for x in seq:
        s += x
        n += 1
        yield s/n
In [38]:
list(tz.take(10, rmean(it.count())))
Out[38]:
[0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5]

4. Write a function for running mean to update in mini-batches of size n

In [42]:
def rmean_mb(size, seq):
    s = 0
    n = 0
    for x in tz.partition_all(size, seq):
        s += sum(x)
        n += len(x)
        yield s/n
In [43]:
list(tz.take(10, rmean_mb(3, it.count())))
Out[43]:
[1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0, 11.5, 13.0, 14.5]

5. Count the frequencies of 2-tuples formed by reading from all files with name numbers???.txt one line at a time, ignoring comment lines, and including tuples that wrap around the end of the line. The numbers???.txt files can be generated using the code below.

In [8]:
num_files = 3
num_lines = 100000
for i in range(num_files):
    with open('numbers%03d.txt' % i, 'w') as f:
        for j in range(num_lines):
            if np.random.rand() < 0.3:
                items = '#'
            else:
                items = np.random.randint(0, 5, 10)
            f.write('\t'.join(map(str, items)) + '\n')
In [28]:
def freqs(paths):
    ans = tz.pipe(
        paths,
        glob,
        c.map(open),
        tz.concat,
        c.filter(lambda line: not line.startswith('#')),
        c.map(str.split),
        tz.concat,
        c.map(int),
        c.sliding_window(2),
        tz.frequencies
    )
    return ans
In [29]:
freqs('numbers???.txt')
Out[29]:
{(0, 0): 84268,
 (0, 1): 83326,
 (0, 2): 84399,
 (0, 3): 84153,
 (0, 4): 83620,
 (1, 0): 83723,
 (1, 1): 83756,
 (1, 2): 83952,
 (1, 3): 83608,
 (1, 4): 83269,
 (2, 0): 83858,
 (2, 1): 84105,
 (2, 2): 84428,
 (2, 3): 84063,
 (2, 4): 83778,
 (3, 0): 84404,
 (3, 1): 83750,
 (3, 2): 84055,
 (3, 3): 84015,
 (3, 4): 84198,
 (4, 0): 83513,
 (4, 1): 83370,
 (4, 2): 83398,
 (4, 3): 84583,
 (4, 4): 83507}

6. Find line numbers of comments in fot.txt.

In [31]:
%%file foo.txt
1,2
3,4
5,6 # first comment
7,8
9,10,
11,12 # second comment
Writing foo.txt

Option 1: Using pipe

In [48]:
def find_comment_line_numbers_1(filename):
    ans = tz.pipe(
        filename,
        open,
        enumerate,
        c.filter(lambda x: '#' in x[1]),
        c.map(lambda x: x[0])
    )
    return ans
In [49]:
ans = find_comment_line_numbers_1('foo.txt')
list(ans)
Out[49]:
[2, 5]

Option 2: Using regular Python

In [46]:
def find_comment_line_numbers_2(filename):
    with open(filename) as f:
        for i, line in enumerate(f):
            if '#' in line:
                yield i
In [47]:
ans = find_comment_line_numbers_2('foo.txt')
list(ans)
Out[47]:
[2, 5]
In [ ]: