Biggish Data

We shall discuss libraries that are useful when your data is too big to fit in memory, but probably not big enough to justify the added complexity of moving to a cluster. With todays technology, this includes data sets of approximately 10s to 100s of gigabytes in size.

One goal is to introduce the storage library h5py and the pydata packages odo, dask and blaze.

Packages to install

pip install dask
pip install cloudpickle
pip install graphviz
pip install memory_profiler
import time
import string
import h5py
import dask
import dask.bag as db
import dask.dataframe as dd
import dask.array as da
from dask.dot import dot_graph
from dask.imperative import do, value
from odo import odo, drop, discover
from blaze import dshape, Data, by

1 billion numbers

We first create 1 billion numbers in \(1000 \times 1000\) blocks and save each block in binary format.

def save(i, clobber=False):
    fn = 'x%06d.npy' % i
    if clobber or not os.path.exists(fn):
        x = np.random.random((1000,1000))
        np.save(fn, x)
    return i
def save_csv(i, clobber=False):
    fn = 'x%06d.csv' % i
    if clobber or not os.path.exists(fn):
        x = np.random.random((1000,1000))
        np.savetxt(fn, x, delimiter=',', fmt='%d')
    return i
%timeit save(0, clobber=True)
10 loops, best of 3: 63.8 ms per loop
%timeit save_csv(0, clobber=True)
1 loops, best of 3: 316 ms per loop

Write to disk in parallel with concurrent.futures

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
    pool.map(save, range(1000))

Memory usage

%load_ext memory_profiler
The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
%memit x = np.load('x%06d.npy' % 0)
peak memory: 633.05 MiB, increment: 7.64 MiB
%memit x = np.loadtxt('x%06d.csv' % 0, delimiter=',')
peak memory: 652.47 MiB, increment: 27.05 MiB

Loading and proecessing times for a single file

def process_one(f):
    x = np.load(f)
    return x.mean()
n = 100
start = time.time()
xs =[process_one('x%06d.npy' % i) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.0030391889 Total: 3.31s Per file: 0.03s

Saving multiple numpy arrays to a single file

Using savez and savez_compressed

n = 100
np.savez('xs.npz', *(np.random.random((1000,1000))
         for i in range(n)))
xs = np.load('xs.npz')
xs.keys()[:3]
['arr_32', 'arr_19', 'arr_81']
xs['arr_0'].mean()
0.5000693251536017

Serial code / memmap

Memory-mapped files are used for accessing small segments of large files on disk, without reading the entire file into memory. The numpy.memmap can be used anywhere an ndarray is used. The maximum size of a memmap array is 2 GB.

n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
if not os.path.exists(filename):
    fp = np.memmap(filename, dtype='float64', mode='w+',
                   shape=shape) # create memmap
    for i in range(n):
        x = np.load('x%06d.npy' % i)
        fp[i] = x
    del fp # Deletion flushes to disk before removing the object
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape) # get handle to memmap
n = 100
start = time.time()
xs = [fp[i].mean() for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.0030391889 Total: 0.62s Per file: 0.01s

Serial code / HDF5 file

HDF5 is a hierarchical file format that allows selective disk reads, but also provides a tree structure for organizing your data sets. It also does not have the size limitation of memmap and can include metadata annotation for documentation. Because of its flexibility, you should seriously consider using HDF5 for your data storage needs.

I suggest using the python package h5py for working wiht HDF5 files. See documentation.

Load data into HDF5 file

import datetime
%%time
n = 100
if not os.path.exists('random.hdf5'):
    with h5py.File('random.hdf5', 'w') as f:
        for i in range(n):
            x = np.load('x%06d.npy' % i)
            dset = f.create_dataset('x%06d' % i, shape=x.shape)
            dset[:] = x
            dset.attrs['created'] = str(datetime.datetime.now())
CPU times: user 898 ms, sys: 1.19 s, total: 2.09 s
Wall time: 2.24 s
with h5py.File('random.hdf5', 'r') as f:
    f.visit(lambda x: print(x, f[x].shape, f[x].attrs['created']))
x000000 (1000, 1000) 2016-04-14 09:58:14.624532
x000001 (1000, 1000) 2016-04-14 09:58:14.644895
x000002 (1000, 1000) 2016-04-14 09:58:14.679152
x000003 (1000, 1000) 2016-04-14 09:58:14.700687
x000004 (1000, 1000) 2016-04-14 09:58:14.723214
x000005 (1000, 1000) 2016-04-14 09:58:14.747455
x000006 (1000, 1000) 2016-04-14 09:58:14.772302
x000007 (1000, 1000) 2016-04-14 09:58:14.796710
x000008 (1000, 1000) 2016-04-14 09:58:14.821258
x000009 (1000, 1000) 2016-04-14 09:58:14.842970
x000010 (1000, 1000) 2016-04-14 09:58:14.863529
x000011 (1000, 1000) 2016-04-14 09:58:14.884584
x000012 (1000, 1000) 2016-04-14 09:58:14.905112
x000013 (1000, 1000) 2016-04-14 09:58:14.928326
x000014 (1000, 1000) 2016-04-14 09:58:14.953344
x000015 (1000, 1000) 2016-04-14 09:58:14.978183
x000016 (1000, 1000) 2016-04-14 09:58:15.001180
x000017 (1000, 1000) 2016-04-14 09:58:15.021939
x000018 (1000, 1000) 2016-04-14 09:58:15.042833
x000019 (1000, 1000) 2016-04-14 09:58:15.067651
x000020 (1000, 1000) 2016-04-14 09:58:15.088946
x000021 (1000, 1000) 2016-04-14 09:58:15.109024
x000022 (1000, 1000) 2016-04-14 09:58:15.133326
x000023 (1000, 1000) 2016-04-14 09:58:15.154661
x000024 (1000, 1000) 2016-04-14 09:58:15.180672
x000025 (1000, 1000) 2016-04-14 09:58:15.202285
x000026 (1000, 1000) 2016-04-14 09:58:15.227149
x000027 (1000, 1000) 2016-04-14 09:58:15.250007
x000028 (1000, 1000) 2016-04-14 09:58:15.271642
x000029 (1000, 1000) 2016-04-14 09:58:15.292183
x000030 (1000, 1000) 2016-04-14 09:58:15.313753
x000031 (1000, 1000) 2016-04-14 09:58:15.334887
x000032 (1000, 1000) 2016-04-14 09:58:15.357399
x000033 (1000, 1000) 2016-04-14 09:58:15.378999
x000034 (1000, 1000) 2016-04-14 09:58:15.400710
x000035 (1000, 1000) 2016-04-14 09:58:15.425733
x000036 (1000, 1000) 2016-04-14 09:58:15.448780
x000037 (1000, 1000) 2016-04-14 09:58:15.470910
x000038 (1000, 1000) 2016-04-14 09:58:15.491796
x000039 (1000, 1000) 2016-04-14 09:58:15.513199
x000040 (1000, 1000) 2016-04-14 09:58:15.538417
x000041 (1000, 1000) 2016-04-14 09:58:15.559718
x000042 (1000, 1000) 2016-04-14 09:58:15.585009
x000043 (1000, 1000) 2016-04-14 09:58:15.605381
x000044 (1000, 1000) 2016-04-14 09:58:15.626484
x000045 (1000, 1000) 2016-04-14 09:58:15.647832
x000046 (1000, 1000) 2016-04-14 09:58:15.669104
x000047 (1000, 1000) 2016-04-14 09:58:15.690210
x000048 (1000, 1000) 2016-04-14 09:58:15.710586
x000049 (1000, 1000) 2016-04-14 09:58:15.731448
x000050 (1000, 1000) 2016-04-14 09:58:15.752450
x000051 (1000, 1000) 2016-04-14 09:58:15.773169
x000052 (1000, 1000) 2016-04-14 09:58:15.794386
x000053 (1000, 1000) 2016-04-14 09:58:15.819318
x000054 (1000, 1000) 2016-04-14 09:58:15.840124
x000055 (1000, 1000) 2016-04-14 09:58:15.861567
x000056 (1000, 1000) 2016-04-14 09:58:15.882513
x000057 (1000, 1000) 2016-04-14 09:58:15.906563
x000058 (1000, 1000) 2016-04-14 09:58:15.932313
x000059 (1000, 1000) 2016-04-14 09:58:15.952882
x000060 (1000, 1000) 2016-04-14 09:58:15.973459
x000061 (1000, 1000) 2016-04-14 09:58:15.994462
x000062 (1000, 1000) 2016-04-14 09:58:16.017134
x000063 (1000, 1000) 2016-04-14 09:58:16.041931
x000064 (1000, 1000) 2016-04-14 09:58:16.062847
x000065 (1000, 1000) 2016-04-14 09:58:16.083855
x000066 (1000, 1000) 2016-04-14 09:58:16.105141
x000067 (1000, 1000) 2016-04-14 09:58:16.126746
x000068 (1000, 1000) 2016-04-14 09:58:16.148961
x000069 (1000, 1000) 2016-04-14 09:58:16.174063
x000070 (1000, 1000) 2016-04-14 09:58:16.195580
x000071 (1000, 1000) 2016-04-14 09:58:16.216251
x000072 (1000, 1000) 2016-04-14 09:58:16.236949
x000073 (1000, 1000) 2016-04-14 09:58:16.259226
x000074 (1000, 1000) 2016-04-14 09:58:16.284231
x000075 (1000, 1000) 2016-04-14 09:58:16.303796
x000076 (1000, 1000) 2016-04-14 09:58:16.324429
x000077 (1000, 1000) 2016-04-14 09:58:16.349299
x000078 (1000, 1000) 2016-04-14 09:58:16.372384
x000079 (1000, 1000) 2016-04-14 09:58:16.395142
x000080 (1000, 1000) 2016-04-14 09:58:16.416727
x000081 (1000, 1000) 2016-04-14 09:58:16.436739
x000082 (1000, 1000) 2016-04-14 09:58:16.460696
x000083 (1000, 1000) 2016-04-14 09:58:16.483000
x000084 (1000, 1000) 2016-04-14 09:58:16.503930
x000085 (1000, 1000) 2016-04-14 09:58:16.525078
x000086 (1000, 1000) 2016-04-14 09:58:16.545269
x000087 (1000, 1000) 2016-04-14 09:58:16.565704
x000088 (1000, 1000) 2016-04-14 09:58:16.586277
x000089 (1000, 1000) 2016-04-14 09:58:16.608225
x000090 (1000, 1000) 2016-04-14 09:58:16.629807
x000091 (1000, 1000) 2016-04-14 09:58:16.649905
x000092 (1000, 1000) 2016-04-14 09:58:16.674557
x000093 (1000, 1000) 2016-04-14 09:58:16.695010
x000094 (1000, 1000) 2016-04-14 09:58:16.715820
x000095 (1000, 1000) 2016-04-14 09:58:16.736731
x000096 (1000, 1000) 2016-04-14 09:58:16.757302
x000097 (1000, 1000) 2016-04-14 09:58:16.778410
x000098 (1000, 1000) 2016-04-14 09:58:16.798878
x000099 (1000, 1000) 2016-04-14 09:58:16.819455
n = 100
start = time.time()
with h5py.File('random.hdf5', 'r') as f:
    xs = [np.mean(f['x%06d' % i]) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.003 Total: 0.58s Per file: 0.01s

Using Dask

From the official documentation,

Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.

Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write    complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.

On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.

The model for how Dask works is quite similar to Spark, and we will see the same features

  • lazy data structures and functions
  • functional style of chaining computations and use of higher order functions
  • trigger evaluations by actions
  • convenience wrappers for possibly dispersed data that mimic numpy arrays, dicts and pandas dataframes

These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging).

n = 100
start = time.time()
with h5py.File('random.hdf5', 'r') as f:
    xs = [da.from_array(f['x%06d' % i], chunks=(1000,1000)) for i in range(n)]
    xs = da.concatenate(xs)
    avg = xs.mean().compute()
elapsed = time.time() - start
print(avg, 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.0030391889 Total: 0.96s Per file: 0.01s

Dask bags work like dictionaries for unstructured or semi-structured data sets, typically over many files.

The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia

b = db.from_filenames('data/wiki/AA/*')
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 90.89s

Change the scheduler

start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 12.24s

Function chaining

freqs = (b.str.translate({ord(char): None for char in string.punctuation})
          .str.lower()
          .str.split()
          .concat()
          .frequencies())
freqs.take(5)
(('statites', 2),
 ('tubanti', 1),
 ('visualisation', 8),
 ('manualized', 1),
 ('id2328', 1))
freqs.topk(5, key=lambda x: x[1]).compute()
[('the', 1214860),
 ('of', 619481),
 ('and', 487234),
 ('in', 438346),
 ('to', 361966)]

Visualizing the task graph

dot_graph(freqs.dask)
_images/17A_Intermediate_Sized_Data_48_0.png

Dask dataframes

Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.

start = time.time()
df = dd.read_csv('x00000*.csv', header=None)
print(df.describe().compute())
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
                0             1             2             3             4    count  10000.000000  10000.000000  10000.000000  10000.000000  10000.000000
mean      99.522200    100.134400     99.460800     99.625000    100.180800
std        9.700707     10.038985     10.037736      9.917672     10.075647
min       69.000000     71.000000     62.000000     70.000000     64.000000
25%       93.000000     93.000000     93.000000     94.000000     93.000000
50%      100.000000    100.000000    100.000000    100.000000    100.000000
75%      106.000000    107.000000    106.000000    107.000000    107.000000
max      131.000000    137.000000    131.000000    137.000000    136.000000

                5             6             7            8             9    count  10000.000000  10000.000000  10000.000000  10000.00000  10000.000000
mean     100.281200    100.254000     99.818600     99.95520    100.009600
std       10.030164      9.826736      9.794064      9.65199     10.176319
min       66.000000     64.000000     72.000000     71.00000     69.000000
25%       93.750000     94.000000     93.750000     94.00000     93.000000
50%      100.000000    100.000000    100.000000    100.00000    100.000000
75%      107.000000    107.000000    107.000000    106.25000    107.000000
max      133.000000    133.000000    136.000000    135.00000    132.000000

           ...                990           991           992           993  count      ...       10000.000000  10000.000000  10000.000000  10000.000000
mean       ...          99.970000    100.148800    100.098000    100.328000
std        ...          10.160444      9.877126      9.931357     10.112559
min        ...          67.000000     67.000000     62.000000     67.000000
25%        ...          93.000000     94.000000     94.000000     93.000000
50%        ...         100.000000    100.000000    100.000000    100.000000
75%        ...         107.000000    108.000000    107.000000    107.000000
max        ...         139.000000    131.000000    136.000000    134.000000

                994           995           996           997           998  count  10000.000000  10000.000000  10000.000000  10000.000000  10000.000000
mean      99.636600     99.956600     99.883600    100.390200     99.722000
std        9.912475      9.999976     10.414283      9.828693      9.839959
min       70.000000     70.000000     66.000000     71.000000     67.000000
25%       93.000000     93.000000     93.000000     94.000000     93.000000
50%      100.000000     99.500000    100.000000    101.000000    100.000000
75%      106.000000    107.000000    107.000000    107.000000    107.000000
max      141.000000    133.000000    137.000000    136.000000    134.000000

                999
count  10000.000000
mean      99.936400
std        9.470276
min       69.000000
25%       94.000000
50%      100.000000
75%      107.000000
max      135.000000

[8 rows x 1000 columns]
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 28.32s

Converting bags to dataframes

df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(10)
word n
0 statites 2
1 tubanti 1
2 visualisation 8
3 id1872 1
4 id2328 1
5 rolphton 1
6 enko 2
7 400–500 3
8 technique—known 1
9 komatiites 1

Dask Imperative

Sometimes you need to run custom functions that don’t fit into the array, bag or dataframe abstractions. Dask provides the imperative module for this purpose with two decorators: do that wraps a function and value that wraps classes. Apart from decorators and the need to call compute for evaluation, you just write regular Python code - yet it can take advantage of the Dask scheduling machinery. Note that the for loop simply builds up a graph of necessary computations - no computation is actually done until compute is called.

@do
def load(filename):
    with open(filename) as f:
        return f.read()

@do
def clean(data):
    return (data
            .translate({ord(char): None for char in string.punctuation})
            .lower()
            )
@do
def analyze(sequence_of_data):
    wc = {}
    for data in sequence_of_data:
        words = data.split()
        for word in words:
            wc[word] = wc.get(word, 0) + 1
    return wc

@do
def top_k(counts, k, **kwargs):
    return sorted(counts.items(), reverse = True, **kwargs)[:k]
files = glob.glob('/Volumes/HD4/data/wiki/extracted/AA/*')[:3]
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
top5 = top_k(analyzed, 5)

top5.compute()
[('주판', 1), ('주산', 1), ('수판', 1), ('ㄢ', 1), ('ㄌㄨㄢ', 1)]
top_k(analyzed, 5, key=lambda x: x[1]).compute()
[('the', 36659), ('of', 19458), ('and', 15522), ('in', 13509), ('to', 10843)]

Using Blaze

Blaze also works on heterogeneous data sets, and provides a high-level consistent interface for working with data from mulitple sources. Under the hood, blaze may make use of odo, dask and pandas. Using blaze is very similar to usage pandas. See official documentation.

See description at http://seanlahman.com/files/database/readme58.txt

import urllib.request

url = 'https://github.com/jknecht/baseball-archive-sqlite/raw/master/lahman2013.sqlite'
file_name = 'lahman2013.sqlite'
urllib.request.urlretrieve(url, file_name)
('lahman2013.sqlite', <http.client.HTTPMessage at 0x173c9f6a0>)
db = Data('sqlite:///lahman2013.sqlite')
db.fields
['AllstarFull',
 'Appearances',
 'AwardsManagers',
 'AwardsPlayers',
 'AwardsShareManagers',
 'AwardsSharePlayers',
 'Batting',
 'BattingPost',
 'Fielding',
 'FieldingOF',
 'FieldingPost',
 'HallOfFame',
 'Managers',
 'ManagersHalf',
 'Master',
 'Pitching',
 'PitchingPost',
 'Salaries',
 'Schools',
 'SchoolsPlayers',
 'SeriesPost',
 'Teams',
 'TeamsFranchises',
 'TeamsHalf',
 'temp']
db.Master.head(n=3)
playerID birthYear birthMonth birthDay birthCountry birthState birthCity deathYear deathMonth deathDay deathCountry deathState deathCity nameFirst nameLast nameGiven weight height bats throws debut finalGame retroID bbrefID
0 aardsda01 1981 12 27 USA CO Denver NaN NaN NaN None None None David Aardsma David Allan 205 75 R R 1081227600000.0000000000 1380344400000.0000000000 aardd001 aardsda01
1 aaronha01 1934 2 5 USA AL Mobile NaN NaN NaN None None None Hank Aaron Henry Louis 180 72 R R -496087200000.0000000000 213166800000.0000000000 aaroh101 aaronha01
2 aaronto01 1939 8 5 USA AL Mobile 1984 8 16 USA GA Atlanta Tommie Aaron Tommie Lee 190 75 R R -243885600000.0000000000 54709200000.0000000000 aarot101 aaronto01
master = db.Master
birth = by(master.birthCountry, n=master.birthCountry.count())
birth.sort('n', ascending=False).head(5)
birthCountry n
0 USA 16153
1 D.R. 592
2 Venezuela 301
3 CAN 243
4 P.R. 238