Biggish Data

We shall discuss libraries that are useful when your data is too big to fit in memory, but probably not big enough to justify the added complexity of moving to a cluster. With todays technology, this includes data sets of approximately 10s to 100s of gigabytes in size.

One goal is to introduce the storage library h5py and the pydata packages odo, dask and blaze.

In [1]:
import time
import string
import h5py
from concurrent.futures import ProcessPoolExecutor
import dask
import dask.bag as db
import dask.dataframe as dd
import dask.array as da
from dask.dot import dot_graph
from dask.imperative import do, value
from odo import odo, drop, discover
from blaze import dshape, Data, by

1 billion numbers

We first create 1 billion numbers in \(1000 \times 1000\) blocks and store them as numpy binary files.

In [2]:
def save_csv(i):
    x = np.random.poisson(100,(1000,1000))
    np.savetxt('x%06d.csv' % i, x, delimiter=',', fmt='%d')
In [3]:
np.random.seed(123)
with ProcessPoolExecutor() as pool:
    pool.map(save_csv, range(1000))

Memory usage

In [4]:
%load_ext memory_profiler
In [5]:
%memit x = np.loadtxt('x%06d.csv' % 0, delimiter=',')
peak memory: 213.06 MiB, increment: 37.47 MiB
In [6]:
def process_one(f):
    x = np.loadtxt(f, delimiter=',')
    return x.mean()

Serial code / text file

In [7]:
n = 10
start = time.time()
xs =[process_one('x%06d.csv' % i) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
100.009394 Total: 21.02s Per file: 2.10s

Parallel code / text file

In [8]:
n = 100
start = time.time()
with ProcessPoolExecutor() as pool:
    xs = list(pool.map(process_one, ('x%06d.csv' % i for i in range(n))))
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.9980644 Total: 29.34s Per file: 0.29s

Serial code / memmap

Memory-mapped files are used for accessing small segments of large files on disk, without reading the entire file into memory. The numpy.memmap can be used anywhere an ndarray is used.

In [3]:
data = np.random.poisson(lam=1, size=(100, 1000,1000))
In [4]:
filename = 'poisson.dat'
fp = np.memmap(filename, dtype='float32', mode='w+', shape=(100, 1000, 1000)) # create memmap
fp[:] = data[:] # write to disk
del fp # Deletion flushes memory changes to disk before removing the object
In [5]:
fp = np.memmap(filename, dtype='float32', mode='r', shape=(100, 1000, 1000)) # get handle to memmap
In [ ]:
fp[:, 25, 100] # retrieve small amount of data

Serial code / HDF5 file

Using the appropriate data structure makes a massive difference.

Load data into HDF5 file

In [11]:
%%time
if not os.path.exists('Billion1.hdf5'):
    with h5py.File('Billion1.hdf5', 'w') as f:
        for i in range(1000):
            x = np.loadtxt('x%06d.csv' % i, delimiter=',', dtype='i8')
            dset = f.create_dataset('x%06d' % i, shape=x.shape)
            dset[:] = x
CPU times: user 46 µs, sys: 102 µs, total: 148 µs
Wall time: 83.9 µs
In [12]:
n = 1000
start = time.time()
with h5py.File('Billion1.hdf5', 'r') as f:
    xs = [np.mean(f['x%06d' % i]) for i in range(1000)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.9987 Total: 14.66s Per file: 0.01s

Using Dask

From the official documentation,

Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.

Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.

On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.

The model for how Dask works is quite similar to Spark, and we will see the same features

  • lazy data structures and functions
  • functional style of chaining computations and use of higher order functions
  • trigger evaluations by actions
  • convenience wrappers for possibly dispersed data that mimic numpy arrays, dicts and pandas dataframes

Dask Arrays

These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging).

In [35]:
n = 1000
start = time.time()
with h5py.File('Billion1.hdf5', 'r') as f:
    xs = [da.from_array(f['x%06d' % i], chunks=(1000,1000)) for i in range(n)]
    xs = da.concatenate(xs)
    avg = xs.mean().compute()
elapsed = time.time() - start
print(avg, 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.998724734 Total: 9.31s Per file: 0.01s

Dask bags

Dask bags work like dictionaries for unstructured or semi-structured data sets, typically over many files.

The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia

In [37]:
b = db.from_filenames('/Volumes/HD4/data/wiki/extracted/AA/*')
In [38]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 99.83s

Change the scheduler

In [39]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 14.47s

Function chaining

In [40]:
freqs = (b.str.translate({ord(char): None for char in string.punctuation})
          .str.lower()
          .str.split()
          .concat()
          .frequencies())
In [41]:
freqs.take(5)
Out[41]:
(('strossburi', 1),
 ('dusts', 4),
 ('otoio', 1),
 ('quantales', 1),
 ('correlatives', 6))
In [42]:
freqs.topk(5, key=lambda x: x[1]).compute()
Out[42]:
[('the', 1214860),
 ('of', 619481),
 ('and', 487234),
 ('in', 438346),
 ('to', 361966)]

Visualizing the task graph

In [43]:
dot_graph(freqs.dask)
Out[43]:
_images/A09_Intermediate_Sized_Data_36_0.png

Dask dataframes

Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.

In [45]:
start = time.time()
df = dd.read_csv('x00000*.csv', header=None)
print(df.describe().compute())
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
                0             1             2             3             4    \
count  10000.000000  10000.000000  10000.000000  10000.000000  10000.000000
mean      99.522200    100.134400     99.460800     99.625000    100.180800
std        9.700707     10.038985     10.037736      9.917672     10.075647
min       69.000000     71.000000     62.000000     70.000000     64.000000
25%       93.000000     93.000000     93.000000     94.000000     93.000000
50%      100.000000    100.000000    100.000000    100.000000    100.000000
75%      106.000000    107.000000    106.000000    107.000000    107.000000
max      131.000000    137.000000    131.000000    137.000000    136.000000

                5             6             7            8             9    \
count  10000.000000  10000.000000  10000.000000  10000.00000  10000.000000
mean     100.281200    100.254000     99.818600     99.95520    100.009600
std       10.030164      9.826736      9.794064      9.65199     10.176319
min       66.000000     64.000000     72.000000     71.00000     69.000000
25%       93.750000     94.000000     93.750000     94.00000     93.000000
50%      100.000000    100.000000    100.000000    100.00000    100.000000
75%      107.000000    107.000000    107.000000    106.25000    107.000000
max      133.000000    133.000000    136.000000    135.00000    132.000000

           ...                990           991           992           993  \
count      ...       10000.000000  10000.000000  10000.000000  10000.000000
mean       ...          99.970000    100.148800    100.098000    100.328000
std        ...          10.160444      9.877126      9.931357     10.112559
min        ...          67.000000     67.000000     62.000000     67.000000
25%        ...          93.000000     94.000000     94.000000     93.000000
50%        ...         100.000000    100.000000    100.000000    100.000000
75%        ...         107.000000    108.000000    107.000000    107.000000
max        ...         139.000000    131.000000    136.000000    134.000000

                994           995           996           997           998  \
count  10000.000000  10000.000000  10000.000000  10000.000000  10000.000000
mean      99.636600     99.956600     99.883600    100.390200     99.722000
std        9.912475      9.999976     10.414283      9.828693      9.839959
min       70.000000     70.000000     66.000000     71.000000     67.000000
25%       93.000000     93.000000     93.000000     94.000000     93.000000
50%      100.000000     99.500000    100.000000    101.000000    100.000000
75%      106.000000    107.000000    107.000000    107.000000    107.000000
max      141.000000    133.000000    137.000000    136.000000    134.000000

                999
count  10000.000000
mean      99.936400
std        9.470276
min       69.000000
25%       94.000000
50%      100.000000
75%      107.000000
max      135.000000

[8 rows x 1000 columns]
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 28.32s

Converting bags to dataframes

In [69]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(10)
Out[69]:
word n
0 strossburi 1
1 dusts 4
2 otoio 1
3 quantales 1
4 correlatives 6
5 hangout—to 1
6 shed 146
7 gemeenschapscommissie 3
8 pageshow 1
9 rider 97

Dask Imperative

Sometimes you need to run custom functions that don’t fit into the array, bag or dataframe abstractions. Dask provides the imperative module for this purpose with two decorators do that wraps a function and value that wraps classes. Apart from decorators and the need to call compute for evaluation, you just write regular Python code - yet it can take advantage of the Dask scheduling machinery. Note that the for loop simply builds up a graph of necessary computations - no computation is actually done until compute is called.

In [70]:
@do
def load(filename):
    with open(filename) as f:
        return f.read()

@do
def clean(data):
    return (data
            .translate({ord(char): None for char in string.punctuation})
            .lower()
            )
@do
def analyze(sequence_of_data):
    wc = {}
    for data in sequence_of_data:
        words = data.split()
        for word in words:
            wc[word] = wc.get(word, 0) + 1
    return wc

@do
def top_k(counts, k, **kwargs):
    return sorted(counts.items(), reverse = True, **kwargs)[:k]

In [71]:
files = glob.glob('/Volumes/HD4/data/wiki/extracted/AA/*')[:3]
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
top5 = top_k(analyzed, 5)

top5.compute()
Out[71]:
[('주판', 1), ('주산', 1), ('수판', 1), ('ㄢ', 1), ('ㄌㄨㄢ', 1)]
In [72]:
top_k(analyzed, 5, key=lambda x: x[1]).compute()
Out[72]:
[('the', 36659), ('of', 19458), ('and', 15522), ('in', 13509), ('to', 10843)]

Using Blaze

Blase also works on heterogeneous data sets, and provides a high level conssitent interface for working with data from mulitple sources. Under the hood, blaze may make use of odo, dask and pandas. Using blaze is very similar to usage pandas. See official documentation.

Downlaod the Lahman baseball statistics database

See description at http://seanlahman.com/files/database/readme58.txt

In [117]:
import urllib.request

url = 'https://github.com/jknecht/baseball-archive-sqlite/raw/master/lahman2013.sqlite'
file_name = 'lahman2013.sqlite'
urllib.request.urlretrieve(url, file_name)
Out[117]:
('lahman2013.sqlite', <http.client.HTTPMessage at 0x129f826d8>)
In [118]:
db = Data('sqlite:///lahman2013.sqlite')
In [121]:
db.fields
Out[121]:
['AllstarFull',
 'Appearances',
 'AwardsManagers',
 'AwardsPlayers',
 'AwardsShareManagers',
 'AwardsSharePlayers',
 'Batting',
 'BattingPost',
 'Fielding',
 'FieldingOF',
 'FieldingPost',
 'HallOfFame',
 'Managers',
 'ManagersHalf',
 'Master',
 'Pitching',
 'PitchingPost',
 'Salaries',
 'Schools',
 'SchoolsPlayers',
 'SeriesPost',
 'Teams',
 'TeamsFranchises',
 'TeamsHalf',
 'temp']
In [183]:
birth = by(db.Master.birthCountry, n=db.Master.birthCountry.count())
birth.sort('n', ascending=False).head(5)
Out[183]:
birthCountry n
0 USA 16153
1 D.R. 592
2 Venezuela 301
3 CAN 243
4 P.R. 238
In [ ]: