Biggish Data¶
We shall discuss libraries that are useful when your data is too big to fit in memory, but probably not big enough to justify the added complexity of moving to a cluster. With todays technology, this includes data sets of approximately 10s to 100s of gigabytes in size.
One goal is to introduce the storage library h5py
and the pydata
packages odo
, dask
and blaze
.
In [1]:
import time
import string
import h5py
from concurrent.futures import ProcessPoolExecutor
import dask
import dask.bag as db
import dask.dataframe as dd
import dask.array as da
from dask.dot import dot_graph
from dask.imperative import do, value
from odo import odo, drop, discover
from blaze import dshape, Data, by
1 billion numbers¶
We first create 1 billion numbers in \(1000 \times 1000\) blocks and store them as numpy binary files.
In [2]:
def save_csv(i):
x = np.random.poisson(100,(1000,1000))
np.savetxt('x%06d.csv' % i, x, delimiter=',', fmt='%d')
In [3]:
np.random.seed(123)
with ProcessPoolExecutor() as pool:
pool.map(save_csv, range(1000))
Memory usage¶
In [4]:
%load_ext memory_profiler
In [5]:
%memit x = np.loadtxt('x%06d.csv' % 0, delimiter=',')
peak memory: 213.06 MiB, increment: 37.47 MiB
In [6]:
def process_one(f):
x = np.loadtxt(f, delimiter=',')
return x.mean()
Serial code / text file¶
In [7]:
n = 10
start = time.time()
xs =[process_one('x%06d.csv' % i) for i in range(n)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
100.009394 Total: 21.02s Per file: 2.10s
Parallel code / text file¶
In [8]:
n = 100
start = time.time()
with ProcessPoolExecutor() as pool:
xs = list(pool.map(process_one, ('x%06d.csv' % i for i in range(n))))
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.9980644 Total: 29.34s Per file: 0.29s
Serial code / memmap
¶
Memory-mapped files are used for accessing small segments of large files
on disk, without reading the entire file into memory. The
numpy.memmap
can be used anywhere an ndarray
is used.
In [3]:
data = np.random.poisson(lam=1, size=(100, 1000,1000))
In [4]:
filename = 'poisson.dat'
fp = np.memmap(filename, dtype='float32', mode='w+', shape=(100, 1000, 1000)) # create memmap
fp[:] = data[:] # write to disk
del fp # Deletion flushes memory changes to disk before removing the object
In [5]:
fp = np.memmap(filename, dtype='float32', mode='r', shape=(100, 1000, 1000)) # get handle to memmap
In [ ]:
fp[:, 25, 100] # retrieve small amount of data
Serial code / HDF5 file¶
Using the appropriate data structure makes a massive difference.
Load data into HDF5 file¶
In [11]:
%%time
if not os.path.exists('Billion1.hdf5'):
with h5py.File('Billion1.hdf5', 'w') as f:
for i in range(1000):
x = np.loadtxt('x%06d.csv' % i, delimiter=',', dtype='i8')
dset = f.create_dataset('x%06d' % i, shape=x.shape)
dset[:] = x
CPU times: user 46 µs, sys: 102 µs, total: 148 µs
Wall time: 83.9 µs
In [12]:
n = 1000
start = time.time()
with h5py.File('Billion1.hdf5', 'r') as f:
xs = [np.mean(f['x%06d' % i]) for i in range(1000)]
elapsed = time.time() - start
print(np.mean(xs), 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.9987 Total: 14.66s Per file: 0.01s
Using Dask¶
From the official documentation,
Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
The model for how Dask works is quite similar to Spark, and we will see the same features
- lazy data structures and functions
- functional style of chaining computations and use of higher order functions
- trigger evaluations by actions
- convenience wrappers for possibly dispersed data that mimic
numpy
arrays,dicts
andpandas
dataframes
Dask Arrays¶
These behave like numpy
arrays, but break a massive job into
tasks that are then executed by a scheduler. The default
scheduler uses threading but you can also use multiprocessing or
distributed or even serial processing (mainly for debugging).
In [35]:
n = 1000
start = time.time()
with h5py.File('Billion1.hdf5', 'r') as f:
xs = [da.from_array(f['x%06d' % i], chunks=(1000,1000)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
elapsed = time.time() - start
print(avg, 'Total: %.2fs Per file: %.2fs' % (elapsed, elapsed/n))
99.998724734 Total: 9.31s Per file: 0.01s
Dask bags¶
Dask bags work like dictionaries for unstructured or semi-structured data sets, typically over many files.
The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia¶
In [37]:
b = db.from_filenames('/Volumes/HD4/data/wiki/extracted/AA/*')
In [38]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 99.83s
Change the scheduler¶
In [39]:
start = time.time()
words = b.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 14.47s
Function chaining¶
In [40]:
freqs = (b.str.translate({ord(char): None for char in string.punctuation})
.str.lower()
.str.split()
.concat()
.frequencies())
In [41]:
freqs.take(5)
Out[41]:
(('strossburi', 1),
('dusts', 4),
('otoio', 1),
('quantales', 1),
('correlatives', 6))
In [42]:
freqs.topk(5, key=lambda x: x[1]).compute()
Out[42]:
[('the', 1214860),
('of', 619481),
('and', 487234),
('in', 438346),
('to', 361966)]
Dask dataframes¶
Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.
In [45]:
start = time.time()
df = dd.read_csv('x00000*.csv', header=None)
print(df.describe().compute())
elapsed = time.time() - start
print(top10, 'Total: %.2fs' % (elapsed, ))
0 1 2 3 4 \
count 10000.000000 10000.000000 10000.000000 10000.000000 10000.000000
mean 99.522200 100.134400 99.460800 99.625000 100.180800
std 9.700707 10.038985 10.037736 9.917672 10.075647
min 69.000000 71.000000 62.000000 70.000000 64.000000
25% 93.000000 93.000000 93.000000 94.000000 93.000000
50% 100.000000 100.000000 100.000000 100.000000 100.000000
75% 106.000000 107.000000 106.000000 107.000000 107.000000
max 131.000000 137.000000 131.000000 137.000000 136.000000
5 6 7 8 9 \
count 10000.000000 10000.000000 10000.000000 10000.00000 10000.000000
mean 100.281200 100.254000 99.818600 99.95520 100.009600
std 10.030164 9.826736 9.794064 9.65199 10.176319
min 66.000000 64.000000 72.000000 71.00000 69.000000
25% 93.750000 94.000000 93.750000 94.00000 93.000000
50% 100.000000 100.000000 100.000000 100.00000 100.000000
75% 107.000000 107.000000 107.000000 106.25000 107.000000
max 133.000000 133.000000 136.000000 135.00000 132.000000
... 990 991 992 993 \
count ... 10000.000000 10000.000000 10000.000000 10000.000000
mean ... 99.970000 100.148800 100.098000 100.328000
std ... 10.160444 9.877126 9.931357 10.112559
min ... 67.000000 67.000000 62.000000 67.000000
25% ... 93.000000 94.000000 94.000000 93.000000
50% ... 100.000000 100.000000 100.000000 100.000000
75% ... 107.000000 108.000000 107.000000 107.000000
max ... 139.000000 131.000000 136.000000 134.000000
994 995 996 997 998 \
count 10000.000000 10000.000000 10000.000000 10000.000000 10000.000000
mean 99.636600 99.956600 99.883600 100.390200 99.722000
std 9.912475 9.999976 10.414283 9.828693 9.839959
min 70.000000 70.000000 66.000000 71.000000 67.000000
25% 93.000000 93.000000 93.000000 94.000000 93.000000
50% 100.000000 99.500000 100.000000 101.000000 100.000000
75% 106.000000 107.000000 107.000000 107.000000 107.000000
max 141.000000 133.000000 137.000000 136.000000 134.000000
999
count 10000.000000
mean 99.936400
std 9.470276
min 69.000000
25% 94.000000
50% 100.000000
75% 107.000000
max 135.000000
[8 rows x 1000 columns]
[('the', 1051994), ('of', 617239), ('and', 482039), ('in', 370266), ('to', 356495), ('a', 312597), ('is', 174145), ('as', 145215), ('was', 141788), ('The', 141724)] Total: 28.32s
Converting bags to dataframes¶
In [69]:
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(10)
Out[69]:
word | n | |
---|---|---|
0 | strossburi | 1 |
1 | dusts | 4 |
2 | otoio | 1 |
3 | quantales | 1 |
4 | correlatives | 6 |
5 | hangout—to | 1 |
6 | shed | 146 |
7 | gemeenschapscommissie | 3 |
8 | pageshow | 1 |
9 | rider | 97 |
Dask Imperative¶
Sometimes you need to run custom functions that don’t fit into the
array, bag or dataframe abstractions. Dask provides the imperative
module for this purpose with two decorators do
that wraps a function
and value
that wraps classes. Apart from decorators and the need to
call compute
for evaluation, you just write regular Python code -
yet it can take advantage of the Dask scheduling machinery. Note that
the for loop simply builds up a graph of necessary computations - no
computation is actually done until compute is called.
In [70]:
@do
def load(filename):
with open(filename) as f:
return f.read()
@do
def clean(data):
return (data
.translate({ord(char): None for char in string.punctuation})
.lower()
)
@do
def analyze(sequence_of_data):
wc = {}
for data in sequence_of_data:
words = data.split()
for word in words:
wc[word] = wc.get(word, 0) + 1
return wc
@do
def top_k(counts, k, **kwargs):
return sorted(counts.items(), reverse = True, **kwargs)[:k]
In [71]:
files = glob.glob('/Volumes/HD4/data/wiki/extracted/AA/*')[:3]
loaded = [load(i) for i in files]
cleaned = [clean(i) for i in loaded]
analyzed = analyze(cleaned)
top5 = top_k(analyzed, 5)
top5.compute()
Out[71]:
[('주판', 1), ('주산', 1), ('수판', 1), ('ㄢ', 1), ('ㄌㄨㄢ', 1)]
In [72]:
top_k(analyzed, 5, key=lambda x: x[1]).compute()
Out[72]:
[('the', 36659), ('of', 19458), ('and', 15522), ('in', 13509), ('to', 10843)]
Using Blaze¶
Blase also works on heterogeneous data sets, and provides a high level
conssitent interface for working with data from mulitple sources. Under
the hood, blaze
may make use of odo
, dask
and pandas
.
Using blaze
is very similar to usage pandas
. See official
documentation.
Downlaod the Lahman baseball statistics database¶
See description at http://seanlahman.com/files/database/readme58.txt
In [117]:
import urllib.request
url = 'https://github.com/jknecht/baseball-archive-sqlite/raw/master/lahman2013.sqlite'
file_name = 'lahman2013.sqlite'
urllib.request.urlretrieve(url, file_name)
Out[117]:
('lahman2013.sqlite', <http.client.HTTPMessage at 0x129f826d8>)
In [118]:
db = Data('sqlite:///lahman2013.sqlite')
In [121]:
db.fields
Out[121]:
['AllstarFull',
'Appearances',
'AwardsManagers',
'AwardsPlayers',
'AwardsShareManagers',
'AwardsSharePlayers',
'Batting',
'BattingPost',
'Fielding',
'FieldingOF',
'FieldingPost',
'HallOfFame',
'Managers',
'ManagersHalf',
'Master',
'Pitching',
'PitchingPost',
'Salaries',
'Schools',
'SchoolsPlayers',
'SeriesPost',
'Teams',
'TeamsFranchises',
'TeamsHalf',
'temp']
In [183]:
birth = by(db.Master.birthCountry, n=db.Master.birthCountry.count())
birth.sort('n', ascending=False).head(5)
Out[183]:
birthCountry | n | |
---|---|---|
0 | USA | 16153 |
1 | D.R. | 592 |
2 | Venezuela | 301 |
3 | CAN | 243 |
4 | P.R. | 238 |
In [ ]: