Streaming and functional programming¶
In [2]:
# standard libraries
import math
import os
import gzip
from glob import glob
import itertools as it
# 3rd party libraries
import toolz as tz
import toolz.curried as c
import numpy as np
Understanding toolz
¶
concat
¶
In [54]:
list(tz.take(10, tz.concat((range(3), 'abc', it.count(5,2)))))
Out[54]:
[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]
Implementation with regular Python¶
In [55]:
def concat(args):
for arg in args:
yield from arg
In [56]:
list(tz.take(10, concat((range(3), 'abc', it.count(5,2)))))
Out[56]:
[0, 1, 2, 'a', 'b', 'c', 5, 7, 9, 11]
frequencies
¶
In [60]:
dna = np.random.choice(list('ACTG'), 1000, p=[0.1, 0.2, 0.3, 0.4])
dna = ''.join(dna)
dna
Out[60]:
'GCTGCTCTATGCCGGCGACTGTTCCGGTGGCTGGTGTCGGTCTGGTGATTTTCGCCTTCCTTTTTGCATCTCTCTCGTTGTCGTTGCGATGCCCATTCTTACGAGACTGGCATTGAAACGTGCTGTCACTTGGGGCCGTGGTGTCCTGTCTTTGCGGTGCTTTGGGCGGGAGGGGGCTGCTGTATGGGCTGTGGGGCTGAGCAGTACGGTGTCTCTCCGGGCATTCCAGGGGTCTCCTTAGGTCGATACCCCTCCATGTTGAATTGGCGTGACCAGGGCATGTGACCTCGAGGGGGGCCCTGCTACGCGGGTTCGATCAGTGTGTAGCGGGCTGCAGTTGCTATGTCGGTTTCCGAGTGGGGGTCGCTGGGGCTAGCCTTCAGGCGCTGTTTCGGGGCGTGTGTAACTTTCAAACCTCTGGGCGTGTTCCCGACTGTTTTTGTTTCTCCTGTTTATCTGGAAGAGTTGGTCGTTTGAGGGTCAAGCAAGTCGCCGTTTTTTTGTCATTTGGCTGCCGTTTGTAGAGCCAAGTTGGTGGTGATTTCTTCGGGTAGCTGGCCTTGTGCTGAGTGTGGGATCTCTAGTCTAATGCCTGGTTCTGGGTCGTTAGGGTACGTACCTGGCGCGCCAACGTTACGACTGAGGCCGTGCTTTCTCGTGGGGCCGGGGGGACCTCGGCGGGTCGCGCCTTTTGGTGAGGCGGATACCTGGCTCGCTCGGGACCTGAGTTCATTTGTTGGGAGCGGCCCGTCAACGCGTGCCTGATGGGGATTGCTCCGGGGCTCGGTGGTGTTGTTTGGCTTTTACCGCAAGGTGGGTCGTTGTCGCGGGTTGGGGGGGATTTGGCTTTGCGTGTCTCGTCGTTCACTGTGGTCTTAGTTCTCGTTGACTGCAAGAGGAGTGGTTTCTCGGCGTTTTGTGTGTCTAGTACTCGGGCTGTAGCGTCGGGACTGTGGTGGCGCCATCCCAGGTTGACCGCCTGGGTGTGGCCAGCACAGTG'
Raw frequencies
In [61]:
tz.frequencies(dna)
Out[61]:
{'A': 109, 'C': 229, 'G': 357, 'T': 305}
Implementation with regular Python¶
In [65]:
def frequencies(seq):
d = {}
for x in seq:
d[x] = d.get(x, 0) + 1
return d
In [66]:
frequencies(dna)
Out[66]:
{'A': 109, 'C': 229, 'G': 357, 'T': 305}
sliding_window
¶
In [69]:
list(tz.take(10, tz.sliding_window(2, dna)))
Out[69]:
[('G', 'C'),
('C', 'T'),
('T', 'G'),
('G', 'C'),
('C', 'T'),
('T', 'C'),
('C', 'T'),
('T', 'A'),
('A', 'T'),
('T', 'G')]
Implementation with regular Python¶
In [70]:
def sliding_window(n, seq):
tuples = (it.islice(s, i, None) for i, s in enumerate(it.tee(seq, n)))
yield from zip(*tuples)
In [71]:
list(tz.take(10, sliding_window(2, dna)))
Out[71]:
[('G', 'C'),
('C', 'T'),
('T', 'G'),
('G', 'C'),
('C', 'T'),
('T', 'C'),
('C', 'T'),
('T', 'A'),
('A', 'T'),
('T', 'G')]
partition
¶
In [76]:
list(tz.take(10, tz.partition(5, dna)))
Out[76]:
[('G', 'C', 'T', 'G', 'C'),
('T', 'C', 'T', 'A', 'T'),
('G', 'C', 'C', 'G', 'G'),
('C', 'G', 'A', 'C', 'T'),
('G', 'T', 'T', 'C', 'C'),
('G', 'G', 'T', 'G', 'G'),
('C', 'T', 'G', 'G', 'T'),
('G', 'T', 'C', 'G', 'G'),
('T', 'C', 'T', 'G', 'G'),
('T', 'G', 'A', 'T', 'T')]
Implementation with regular Python¶
In [85]:
def partition(n, seq):
xs = iter(seq)
item = tuple(it.islice(xs, n))
while len(item) == n:
yield item
item = tuple(it.islice(xs, n))
In [86]:
list(tz.take(10, partition(5, dna)))
Out[86]:
[('G', 'C', 'T', 'G', 'C'),
('T', 'C', 'T', 'A', 'T'),
('G', 'C', 'C', 'G', 'G'),
('C', 'G', 'A', 'C', 'T'),
('G', 'T', 'T', 'C', 'C'),
('G', 'G', 'T', 'G', 'G'),
('C', 'T', 'G', 'G', 'T'),
('G', 'T', 'C', 'G', 'G'),
('T', 'C', 'T', 'G', 'G'),
('T', 'G', 'A', 'T', 'T')]
partition_all
¶
Regular partiiton
ignores incomplete partitions.
In [92]:
list(tz.take(10, tz.partition(5, range(13))))
Out[92]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9)]
partition_all
includes incomplete partitions.
In [89]:
list(tz.take(10, tz.partition_all(5, range(13))))
Out[89]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]
Challenge: The version of partition
in toolz
has an optional
pad
argument. Implement this.
In [95]:
list(tz.take(10, tz.partition(5, range(13), pad='X')))
Out[95]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 'X', 'X')]
Implementation with regular Python¶
In [90]:
def partition_all(n, seq):
xs = iter(seq)
item = tuple(it.islice(xs, n))
while item:
yield item
item = tuple(it.islice(xs, n))
In [91]:
list(tz.take(10, partition_all(5, range(13))))
Out[91]:
[(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12)]
curry
¶
In [96]:
def f(a, b, c):
return a, b, c
In [102]:
f1 = tz.curry(f)
f1(1)(2)(3)
Out[102]:
(1, 2, 3)
In [103]:
f2 = tz.curry(f, 1, 2)
In [104]:
f2(3)
Out[104]:
(1, 2, 3)
In [109]:
f3 = tz.curry(f, c=3)
In [110]:
f3(1, 2)
Out[110]:
(1, 2, 3)
The toolz
package also provides “curried” versions of its functions
in toolz.curried
(which we have imported as c
).
In [112]:
f = c.take(10)
list(f(it.count()))
Out[112]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [113]:
g = c.map(lambda x: x**2)
list(f(g(it.count())))
Out[113]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
pipe
¶
Like the pipe functionality in R
and bash
, you can pipe data
though through a series of functions. This can make functional programs
much more readable.
In [114]:
tz.pipe(
it.count(),
c.map(lambda x: x**2),
c.take(10),
sum
)
Out[114]:
285
Exercises¶
1. Rewrite this monstrosity using pipe
and curry
.
tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))
In [5]:
tz.reduce(lambda x, y: x + y**2, tz.drop(5, tz.filter(lambda x: x % 2 == 0, tz.take(20, tz.map(lambda x: x**2, it.count())))))
Out[5]:
229764
In [6]:
tz.pipe(
it.count(),
c.map(lambda x: x**2),
c.take(20),
c.filter(lambda x: x % 2 == 0),
c.drop(5),
c.reduce(lambda x, y: x + y**2)
)
Out[6]:
229764
2. How could you code this using only the Python standard library?
In [7]:
xs = it.count()
xs = (x**2 for x in xs)
xs = it.islice(xs, 20)
xs = (x for x in xs if x % 2 == 0)
xs = it.islice(xs, 5, None)
xs = sum(x**2 for x in xs)
xs
Out[7]:
239664
3. Write a function to calculate the running mean of a possibly infinite stream of numbers.
In [36]:
def rmean(seq):
s = 0
n = 0
for x in seq:
s += x
n += 1
yield s/n
In [38]:
list(tz.take(10, rmean(it.count())))
Out[38]:
[0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5]
4. Write a function for running mean to update in mini-batches of
size n
In [42]:
def rmean_mb(size, seq):
s = 0
n = 0
for x in tz.partition_all(size, seq):
s += sum(x)
n += len(x)
yield s/n
In [43]:
list(tz.take(10, rmean_mb(3, it.count())))
Out[43]:
[1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0, 11.5, 13.0, 14.5]
5. Count the frequencies of 2-tuples formed by reading from all
files with name numbers???.txt
one line at a time, ignoring comment
lines, and including tuples that wrap around the end of the line. The
numbers???.txt
files can be generated using the code below.
In [8]:
num_files = 3
num_lines = 100000
for i in range(num_files):
with open('numbers%03d.txt' % i, 'w') as f:
for j in range(num_lines):
if np.random.rand() < 0.3:
items = '#'
else:
items = np.random.randint(0, 5, 10)
f.write('\t'.join(map(str, items)) + '\n')
In [28]:
def freqs(paths):
ans = tz.pipe(
paths,
glob,
c.map(open),
tz.concat,
c.filter(lambda line: not line.startswith('#')),
c.map(str.split),
tz.concat,
c.map(int),
c.sliding_window(2),
tz.frequencies
)
return ans
In [29]:
freqs('numbers???.txt')
Out[29]:
{(0, 0): 84268,
(0, 1): 83326,
(0, 2): 84399,
(0, 3): 84153,
(0, 4): 83620,
(1, 0): 83723,
(1, 1): 83756,
(1, 2): 83952,
(1, 3): 83608,
(1, 4): 83269,
(2, 0): 83858,
(2, 1): 84105,
(2, 2): 84428,
(2, 3): 84063,
(2, 4): 83778,
(3, 0): 84404,
(3, 1): 83750,
(3, 2): 84055,
(3, 3): 84015,
(3, 4): 84198,
(4, 0): 83513,
(4, 1): 83370,
(4, 2): 83398,
(4, 3): 84583,
(4, 4): 83507}
6. Find line numbers of comments in fot.txt
.
In [31]:
%%file foo.txt
1,2
3,4
5,6 # first comment
7,8
9,10,
11,12 # second comment
Writing foo.txt
Option 1: Using pipe
In [48]:
def find_comment_line_numbers_1(filename):
ans = tz.pipe(
filename,
open,
enumerate,
c.filter(lambda x: '#' in x[1]),
c.map(lambda x: x[0])
)
return ans
In [49]:
ans = find_comment_line_numbers_1('foo.txt')
list(ans)
Out[49]:
[2, 5]
Option 2: Using regular Python
In [46]:
def find_comment_line_numbers_2(filename):
with open(filename) as f:
for i, line in enumerate(f):
if '#' in line:
yield i
In [47]:
ans = find_comment_line_numbers_2('foo.txt')
list(ans)
Out[47]:
[2, 5]
In [ ]: