%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np

Working with large data sets¶

Lazy evaluation, pure functions and higher order functions¶

Lazy and eager evaluation¶

A list comprehension is eager.

[x*x for x in range(3)]
[0, 1, 4]

A generator expression is lazy.

(x*x for x in range(3))
<generator object <genexpr> at 0x11d3fca40>

You can use generators as iterators.

g = (x*x for x in range(3))
next(g)
0
next(g)
1
next(g)
4
next(g)
---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

<ipython-input-8-5f315c5de15b> in <module>()
----> 1 next(g)

StopIteration:

A generator is single use.

for i in g:
print(i, end=", ")
g = (x*x for x in range(3))
for i in g:
print(i, end=", ")
0, 1, 4,

The list constructor forces evaluation of the generator.

list(x*x for x in range(3))
[0, 1, 4]

An eager function.

def eager_updown(n):
xs = []
for i in range(n):
xs.append(i)
for i in range(n, -1, -1):
xs.append(i)
return xs
eager_updown(3)
[0, 1, 2, 3, 2, 1, 0]

A lazy generator.

def lazy_updown(n):
for i in range(n):
yield i
for i in range(n, -1, -1):
yield i
lazy_updown(3)
<generator object lazy_updown at 0x11d6bd990>
list(lazy_updown(3))
[0, 1, 2, 3, 2, 1, 0]

Pure and impure functions¶

A pure function is like a mathematical function. Given the same inputs, it always returns the same output, and has no side effects.

def pure(alist):
return [x*x for x in alist]

An impure function has side effects.

def impure(alist):
for i in range(len(alist)):
alist[i] = alist[i]*alist[i]
return alist
xs = [1,2,3]
ys = pure(xs)
print(xs, ys)
[1, 2, 3] [1, 4, 9]
ys = impure(xs)
print(xs, ys)
[1, 4, 9] [1, 4, 9]

Quiz¶

Say if the following functions are pure or impure.

def f1(n):
return n//2 if n % 2==0 else n*3+1
def f2(n):
return np.random.random(n)
def f3(n):
n = 23
return n
def f4(a, n=[]):
n.append(a)
return n

Higher order functions¶

list(map(f1, range(10)))
[0, 4, 1, 10, 2, 16, 3, 22, 4, 28]
list(filter(lambda x: x % 2 == 0, range(10)))
[0, 2, 4, 6, 8]
from functools import reduce
reduce(lambda x, y: x + y, range(10), 0)
45
reduce(lambda x, y: x + y, [[1,2], [3,4], [5,6]], [])
[1, 2, 3, 4, 5, 6]

Using the operator module¶

The operator module provides all the Python operators as functions.

import operator as op
reduce(op.mul, range(1, 6), 1)
120
list(map(op.itemgetter(1), [[1,2,3],[4,5,6],[7,8,9]]))
[2, 5, 8]

Using itertools¶

import itertools as it
list(it.combinations(range(1,6), 3))
[(1, 2, 3),
(1, 2, 4),
(1, 2, 5),
(1, 3, 4),
(1, 3, 5),
(1, 4, 5),
(2, 3, 4),
(2, 3, 5),
(2, 4, 5),
(3, 4, 5)]

Generate all Boolean combinations

list(it.product([0,1], repeat=3))
[(0, 0, 0),
(0, 0, 1),
(0, 1, 0),
(0, 1, 1),
(1, 0, 0),
(1, 0, 1),
(1, 1, 0),
(1, 1, 1)]
[0, 2, 4, 6, 8]
list(it.takewhile(lambda x: x < 3, range(10)))
[0, 1, 2]
data = sorted('the quick brown fox jumps over the lazy dog'.split(), key=len)
for k, g in it.groupby(data, key=len):
print(k, list(g))
3 ['the', 'fox', 'the', 'dog']
4 ['over', 'lazy']
5 ['quick', 'brown', 'jumps']

Using toolz¶

! pip install toolz
Requirement already satisfied: toolz in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
import toolz as tz
list(tz.partition(3, range(10)))
[(0, 1, 2), (3, 4, 5), (6, 7, 8)]
[(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]
n = 30
dna = ''.join(np.random.choice(list('ACTG'), n))
dna
'TGCGTACTTTTCGCTATCCTCTAGTAGTTG'
tz.frequencies(tz.sliding_window(2, dna))
{('A', 'C'): 1,
('A', 'G'): 2,
('A', 'T'): 1,
('C', 'C'): 1,
('C', 'G'): 2,
('C', 'T'): 4,
('G', 'C'): 2,
('G', 'T'): 3,
('T', 'A'): 4,
('T', 'C'): 3,
('T', 'G'): 2,
('T', 'T'): 4}

Using pipes and the curried namespace¶

from toolz import curried as c
tz.pipe(
dna,
c.sliding_window(2), # using curry
c.frequencies,
)
{('A', 'C'): 1,
('A', 'G'): 2,
('A', 'T'): 1,
('C', 'C'): 1,
('C', 'G'): 2,
('C', 'T'): 4,
('G', 'C'): 2,
('G', 'T'): 3,
('T', 'A'): 4,
('T', 'C'): 3,
('T', 'G'): 2,
('T', 'T'): 4}
composed = tz.compose(
c.frequencies,
c.sliding_window(2),
)
composed(dna)
{('A', 'C'): 1,
('A', 'G'): 2,
('A', 'T'): 1,
('C', 'C'): 1,
('C', 'G'): 2,
('C', 'T'): 4,
('G', 'C'): 2,
('G', 'T'): 3,
('T', 'A'): 4,
('T', 'C'): 3,
('T', 'G'): 2,
('T', 'T'): 4}

Processing many sets of DNA strings without reading into memory¶

m = 10000
n = 300
dnas = (''.join(np.random.choice(list('ACTG'), n, p=[.1, .2, .3, .4]))
for i in range(m))
dnas
<generator object <genexpr> at 0x11d6bddb0>
tz.merge_with(sum,
tz.map(
composed,
dnas
)
)
{('A', 'A'): 29994,
('A', 'C'): 59620,
('A', 'G'): 120072,
('A', 'T'): 89211,
('C', 'A'): 59860,
('C', 'C'): 119228,
('C', 'G'): 238914,
('C', 'T'): 179708,
('G', 'A'): 119757,
('G', 'C'): 239565,
('G', 'G'): 478575,
('G', 'T'): 359070,
('T', 'A'): 89249,
('T', 'C'): 179377,
('T', 'G'): 359409,
('T', 'T'): 268391}

Working with out-of-core memory¶

Using memmap¶

You can selectively retrieve parts of numpy arrays stored on disk into memory for processing with memmap.

Memory-mapped files are used for accessing small segments of large files on disk, without reading the entire file into memory. The numpy.memmap can be used anywhere an ndarray is used. The maximum size of a memmap array is limited by what the operating system allows - in particular, this is different on 32 and 64 bit architectures.

Creating the memory mapped file¶

n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)

# create memmap
fp = np.memmap(filename, dtype='float64', mode='w+', shape=shape)

# store some data in it
for i in range(n):
x = np.random.random(shape[1:])
fp[i] = x

# flush to disk and remove file handler
del fp

Usign the memory mapped file¶

fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)

# only one block is retreived into memory at a time
start = time.time()
xs = [fp[i].mean() for i in range(n)]
elapsed = time.time() - start

print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
0.500001143637 Total: 0.64s Per file: 0.01s

Using HDF5¶

HDF5 is a hierarchical file format that allows selective disk reads, but also provides a tree structure for organizing your data sets. It can also include metadata annotation for documentation. Because of its flexibility, you should seriously consider using HDF5 for your data storage needs.

I suggest using the python package h5py for working with HDF5 files. See documentation.

import h5py

Creating an HDF5 file¶

%%time

n = 5
filename = 'random.hdf5'
shape = (n, 1000, 1000)

groups = ['Sim%02d' % i for i in range(5)]

with h5py.File(filename, 'w') as f:
# Create hierarchical group structure
for group in groups:
g = f.create_group(group)
g.attrs['created'] = str(datetime.datetime.now())

# Save 100 arrays in each group
for i in range(n):
x = np.random.random(shape[1:])
dset = g.create_dataset('x%06d' % i, shape=x.shape)
dset[:] = x
dset.attrs['created'] = str(datetime.datetime.now())
CPU times: user 701 ms, sys: 124 ms, total: 825 ms
Wall time: 920 ms

Using an HDF5 file¶

f = h5py.File('random.hdf5', 'r')

The HDF5 objects can be treated like dictionaries.

for name in f:
print(name)
Sim00
Sim01
Sim02
Sim03
Sim04
for key in f.keys():
print(key)
Sim00
Sim01
Sim02
Sim03
Sim04
sim1 = f.get('Sim01')
list(sim1.keys())[:5]
['x000000', 'x000001', 'x000002', 'x000003', 'x000004']

Or recursed through like trees

f.visit(lambda x: print(x))
Sim00
Sim00/x000000
Sim00/x000001
Sim00/x000002
Sim00/x000003
Sim00/x000004
Sim01
Sim01/x000000
Sim01/x000001
Sim01/x000002
Sim01/x000003
Sim01/x000004
Sim02
Sim02/x000000
Sim02/x000001
Sim02/x000002
Sim02/x000003
Sim02/x000004
Sim03
Sim03/x000000
Sim03/x000001
Sim03/x000002
Sim03/x000003
Sim03/x000004
Sim04
Sim04/x000000
Sim04/x000001
Sim04/x000002
Sim04/x000003
Sim04/x000004

Retrieving data and attributes

sim2 = f.get('Sim02')
sim2.attrs['created']
'2017-03-29 16:19:19.613896'
x = sim2.get('x000003')
print(x.shape)
print(x.dtype)
print(list(x.attrs.keys()))
print(x.attrs['created'])
np.mean(x)
(1000, 1000)
float32
['created']
2017-03-29 16:19:19.749793
0.49980888
f.close()

Using SQLite3¶

When data is on a relational database, it is useful to do as much preprocessing as possible using SQL - this will be performed using highly efficient compiled routines on the (potentially remote) computer where the database exists.

Here we will use SQLite3 together with pandas to summarize a (potentially) large database.

import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///data/movies.db')
q = '''
SELECT year, count(*) as number
FROM data
GROUP BY year
ORDER BY number DESC
'''

# The coounting, grouping and sorting is done by the database, not pandas
# So this query will work even if the movies dataabse is many terabytes in size
df = pd.read_sql_query(q, engine)
year number
0 1996 1375
1 2000 1365
2 1998 1360
3 2002 1360
4 1997 1335

Out-of-memory data conversions¶

There is a convenient Python package called odo that will convert data between different formats without having to load all the data into memory first. This allows conversion of potentially huge files.

Odo is a shape shifting character in the Star Trek universe.

! pip install odo
Requirement already satisfied: odo in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
Requirement already satisfied: datashape>=0.5.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: numpy>=1.7 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: pandas>=0.15.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: toolz>=0.7.3 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: multipledispatch>=0.4.7 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: networkx in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from odo)
Requirement already satisfied: python-dateutil in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from datashape>=0.5.0->odo)
Requirement already satisfied: pytz>=2011k in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from pandas>=0.15.0->odo)
Requirement already satisfied: decorator>=3.4.0 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from networkx->odo)
Requirement already satisfied: six>=1.5 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from python-dateutil->datashape>=0.5.0->odo)
import odo
odo.odo('sqlite:///data/movies.db::data', 'data/movies.csv')
<odo.backends.csv.CSV at 0x1503c7828>
index,movieId,title,genres,year
2,3,"Grumpier Old Men (1995)",Comedy|Romance,1995
3,4,"Waiting to Exhale (1995)",Comedy|Drama|Romance,1995
4,5,"Father of the Bride Part II (1995)",Comedy,1995
5,6,"Heat (1995)",Action|Crime|Thriller,1995
6,7,"Sabrina (1995)",Comedy|Romance,1995
7,8,"Tom and Huck (1995)",Adventure|Children,1995
8,9,"Sudden Death (1995)",Action,1995

Probabilistic data structures¶

A data sketch is a probabilistic algorithm or data structure that approximates some statistic of interest, typically using very little memory and processing time. Often they are applied to streaming data, and so must be able to incrementally process data. Many data sketches make use of hash functions to distribute data into buckets uniformly. Typically, data sketches have the following desirable properties

• sub-linear in space
• single scan
• can be parallelized
• can be combined (merge)

Examples where counting distinct values is useful:

• number of unique users in a Twitter stream
• number of distinct records to be fetched by a database query
• number of unique IP addresses accessing a website
• number of distinct queries submitted to a search engine
• number of distinct DNA motifs in genomics data sets (e.g. microbiome)

Packages for data sketches in Python are relatively immature, and if you are interested, you could make a large contribution by creating a comprehensive open source library of data sketches in Python.

HyperLogLog¶

Counting the number of distinct elements exactly requires storage of all distinct elements (e.g. in a set) and hence grows with the cardinality $$n$$. Probabilistic data structures known as Distinct Value Sketches can do this with a tiny and fixed memory size.

A hash function takes data of arbitrary size and converts it into a number in a fixed range. Ideally, given an arbitrary set of data items, the hash function generates numbers that follow a uniform distribution within the fixed range. Hash functions are immensely useful throughout computer science (for example - they power Python sets and dictionaries), and especially for the generation of probabilistic data structures.

The binary digits in a (say) 32-bit hash are effectively random, and equivalent to a sequence of fair coin tosses. Hence the probability that we see a run of 5 zeros in the smallest hash so far suggests that we have added $$2^5$$ unique items so far. This is the intuition behind the loglog family of Distinct Value Sketches. Note that the biggest count we can track with 32 bits is $$2^{32} = 4294967296$$.

The accuracy of the sketch can be improved by averaging results with multiple coin flippers. In practice, this is done by using the first $$k$$ bit registers to identify $$2^k$$ different coin flippers. Hence, the max count is now $$2 ** (32 - k)$$. The hyperloglog algorithm uses the harmonic mean of the $$2^k$$ flippers which reduces the effect of outliers and hence the variance of the estimate.

! pip install hyperloglog
from hyperloglog import HyperLogLog

Compare unique counts with set and hyperloglog¶

def flatten(xs):
return (x for sublist in xs for x in sublist)

def error(a, b, n):
return abs(len(a) - len(b))/n

print('True\t\tHLL\t\tRel Error')
with open('data/Ulysses.txt') as f:
word_list = flatten(line.split() for line in f)

s = set([])
hll = HyperLogLog(error_rate=0.01)
for i, word in enumerate(word_list):
if i%int(.2e5)==0:
print('%8d\t%8d\t\t%.3f' %
(len(s), len(hll),
0 if i==0 else error(s, hll, i)))
True                HLL             Rel Error
1           1                0.000
7095        7151                0.003
11582       11667                0.002
16010       16202                0.003
20058       20296                0.003
23761       24087                0.003
27463       27785                0.003
29780       30173                0.003
33740       34206                0.003
38070       38473                0.002
41599       42235                0.003
44119       44619                0.002
48549       49197                0.003
49511       50191                0.003

Bloom filters¶

Bloom filters are designed to answer queries about whether a specific item is in a collection. If the answer is NO, then it is definitive. However, if the answer is yes, it might be a false positive. The possibility of a false positive makes the Bloom filter a probabilistic data structure.

A bloom filter consists of a bit vector of length $$k$$ initially set to zero, and $$n$$ different hash functions that return a hash value that will fall into one of the $$k$$ bins. In the construction phase, for every item in the collection, $$n$$ hash values are generated by the $$n$$ hash functions, and every position indicated by a hash value is flipped to one. In the query phase, given an item, $$n$$ hash values are calculated as before - if any of these $$n$$ positions is a zero, then the item is definitely not in the collection. However, because of the possibility of hash collisions, even if all the positions are one, this could be a false positive. Clearly, the rate of false positives depends on the ratio of zero and one bits, and there are Bloom filter implementations that will dynamically bound the ratio and hence the false positive rate.

Possible uses of a Bloom filter include:

• Does a particular sequence motif appear in a DNA string?
• Has this book been recommended to this customer before?
• Check if an element exists on disk before performing I/O
• Check if URL is a potential malware site using in-browser Bloom filter to minimize network communication
• As an alternative way to generate distinct value counts cheaply (only increment count if Bloom filter says NO)
! pip install git+https://github.com/jaybaird/python-bloomfilter.git
Collecting git+https://github.com/jaybaird/python-bloomfilter.git
Cloning https://github.com/jaybaird/python-bloomfilter.git to /private/var/folders/xf/rzdg30ps11g93j3w0h589q780000gn/T/pip-4g7j8fys-build
Requirement already satisfied (use --upgrade to upgrade): pybloom==2.0.0 from git+https://github.com/jaybaird/python-bloomfilter.git in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
Requirement already satisfied: bitarray>=0.3.4 in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages (from pybloom==2.0.0)
from pybloom import ScalableBloomFilter
# The Scalable Bloom Filter grows as needed to keep the error rate small
sbf = ScalableBloomFilter(error_rate=0.001)
with open('data/Ulysses.txt') as f:
word_set = set(flatten(line.split() for line in f))
for word in word_set:

Ask Bloom filter if test words were in Ulysses¶

test_words = ['banana', 'artist', 'Dublin', 'masochist', 'Obama']
for word in test_words:
print(word, word in sbf)
banana True
artist True
Dublin True
masochist False
Obama False
for word in test_words:
print(word, word in word_set)
banana True
artist True
Dublin True
masochist False
Obama False

Small-scale distributed programming¶

For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.

These structures are

• dask array ~ numpy array
• dask bag ~ Python dictionary
• dask dataframe ~ pandas dataframe

From the official documentation,

Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.

Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write    complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.

On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
! pip install dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd

These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.

From official documents

For performance, a good choice of chunks follows the following rules:

A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)

# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
avg
0.50000114363730053
# Typically we store Dask arrays inot HDF5

da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)
with h5py.File('data/xs.hdf5', 'r') as f:
print(f.get('/foo/xs').shape)
(100000, 1000)

Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.

for i in range(5):
f = 'data/x%03d.csv' % i
np.savetxt(f, np.random.random((1000, 5)), delimiter=',')
print(df.describe().compute())
0            1            2            3            4
count  5000.000000  5000.000000  5000.000000  5000.000000  5000.000000
mean      0.492787     0.498913     0.506285     0.497849     0.496456
std       0.290051     0.290067     0.289115     0.287252     0.288440
min       0.000120     0.000127     0.001602     0.000124     0.000028
25%       0.270114     0.261522     0.267837     0.262800     0.262552
50%       0.511363     0.525779     0.522839     0.515185     0.511809
75%       0.762856     0.755237     0.761059     0.761570     0.772811
max       0.999914     0.999940     0.999997     0.999718     0.999783

Dask bags work like dictionaries for unstructured or semi-structured data sets, typically over many files.

The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia¶

%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x)
top10 = words.compute()
CPU times: user 1min 12s, sys: 2.27 s, total: 1min 14s
Wall time: 1min 27s
print(top10)
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)]

This is slow because of disk access. Fix by changing scheduler to work asynchronously.

%%time

words = text.str.split().concat().frequencies().topk(10, key=lambda x: x)
top10 = words.compute(get = dask.async.get_sync)
CPU times: user 11.1 s, sys: 413 ms, total: 11.6 s
Wall time: 11.8 s
print(top10)
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)]

Conversion from bag to dataframe¶

import string
freqs = (text.
str.translate({ord(char): None for char in string.punctuation}).
str.lower().
str.split().
concat().
frequencies())
Get the top 5 words sorted by key (not value)¶
[('🚲', 1), ('𝛿', 2), ('𓏘𓃭𓇋𓍯𓊪𓄿𓂧𓂋𓄿𓏏𓆇', 1), ('𒀸𒋗𒁺', 1), ('𐑅', 1)]
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
word n
0 €53 1
1 doodnath 1
2 iphone 82
3 flagged 3
4 desks 10

The compute method converts to a regular pandas dataframe¶

For data sets that fit in memory, pandas is faster and allows some operations like sorting that are not provided by dask dataframes.

df = df_freqs.compute()