%matplotlib inline
import time
import datetime
import matplotlib.pyplot as plt
import numpy as np
Working with large data sets¶
Small-scale distributed programming¶
Using dask
¶
For data sets that are not too big (say up to 1 TB), it is typically sufficient to process on a single workstation. The package dask provides 3 data structures that mimic regular Python data structures but perform computation in a distributed way allowing you to make optimal use of multiple cores easily.
These structures are
- dask array ~ numpy array
- dask bag ~ Python dictionary
- dask dataframe ~ pandas dataframe
From the official documentation,
Dask is a simple task scheduling system that uses directed acyclic graphs (DAGs) of tasks to break up large computations into many small ones.
Dask enables parallel computing through task scheduling and blocked algorithms. This allows developers to write complex parallel algorithms and execute them in parallel either on a modern multi-core machine or on a distributed cluster.
On a single machine dask increases the scale of comfortable data from fits-in-memory to fits-on-disk by intelligently streaming data from disk and by leveraging all the cores of a modern CPU.
For interesting examples of dask
in practice, see Matthew Rocklin’s
blog.
! pip install dask
Requirement already satisfied: dask in /Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask import delayed
dask
arrays¶
These behave like numpy
arrays, but break a massive job into
tasks that are then executed by a scheduler. The default
scheduler uses threading but you can also use multiprocessing or
distributed or even serial processing (mainly for debugging). You can
tell the dask array how to break the data into chunks for
processing.
From official documents
For performance, a good choice of chunks follows the following rules:
A chunk should be small enough to fit comfortably in memory. We’ll have many chunks in memory at once.
A chunk must be large enough so that computations on that chunk take significantly longer than the 1ms overhead per task that dask scheduling incurs. A task should take longer than 100ms.
Chunks should align with the computation that you want to do. For example if you plan to frequently slice along a particular dimension then it’s more efficient if your chunks are aligned so that you have to touch fewer chunks. If you want to add two arrays then its convenient if those arrays have matching chunks patterns.
# We resuse the 100 * 1000 * 1000 random numbers in the memmap file on disk
n = 100
filename = 'random.dat'
shape = (n, 1000, 1000)
fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
# We can decide on the chunk size to be distributed for computing
xs = [da.from_array(fp[i], chunks=(200,500)) for i in range(n)]
xs = da.concatenate(xs)
avg = xs.mean().compute()
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
<ipython-input-4-5adc8ca9ee0a> in <module>()
3 filename = 'random.dat'
4 shape = (n, 1000, 1000)
----> 5 fp = np.memmap(filename, dtype='float64', mode='r', shape=shape)
6
7 # We can decide on the chunk size to be distributed for computing
/Users/cliburn/anaconda2/envs/p3/lib/python3.5/site-packages/numpy/core/memmap.py in __new__(subtype, filename, dtype, mode, offset, shape, order)
219 own_file = True
220 else:
--> 221 fid = open(filename, (mode == 'c' and 'r' or mode)+'b')
222 own_file = True
223
FileNotFoundError: [Errno 2] No such file or directory: 'random.dat'
avg
# Typically we store Dask arrays inot HDF5
da.to_hdf5('data/xs.hdf5', '/foo/xs', xs)
with h5py.File('data/xs.hdf5', 'r') as f:
print(f.get('/foo/xs').shape)
dask
data frames¶
Dask dataframes can treat multiple pandas dataframes that might not simultaneously fit into memory like a single dataframe. See use of globbing to specify multiple source files.
for i in range(5):
f = 'data/x%03d.csv' % i
np.savetxt(f, np.random.random((1000, 5)), delimiter=',')
df = dd.read_csv('data/x*.csv', header=None)
print(df.describe().compute())
dask
bags¶
Dask bags work like multisets for unstructured or semi-structured data sets, typically over many files. A multiset is a set that allows repeats. Unlike lists, order is not preserved.
The dask bag is often used for preprocessing data before conversion to
the more efficient array or dataframe collections. Manipulating dask
bags has a functional flavor, similar to using toolz
for standard
Python collections.
Creating a bag¶
bag = db.from_sequence(np.random.randint(0,4, 10))
bag.frequencies().compute()
The AA subdirectory consists of 101 1 MB plain text files from the English Wikipedia¶
text = db.read_text('data/wiki/AA/*')
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute()
print(top10)
This is slow because of disk access. Fix by changing scheduler to work asynchronously.
%%time
words = text.str.split().concat().frequencies().topk(10, key=lambda x: x[1])
top10 = words.compute(get = dask.async.get_sync)
print(top10)
Conversion from bag to dataframe¶
import string
freqs = (text.
str.translate({ord(char): None for char in string.punctuation}).
str.lower().
str.split().
concat().
frequencies())
Get the top 5 words sorted by key (not value)¶
freqs.topk(5).compute(get = dask.async.get_sync)
df_freqs = freqs.to_dataframe(columns=['word', 'n'])
df_freqs.head(n=5)
The compute method converts to a regular pandas dataframe¶
For data sets that fit in memory, pandas is faster and allows some operations like sorting that are not provided by dask dataframes.
df = df_freqs.compute()
df.sort_values('word', ascending=False).head(5)
dask delayed¶
For full custom pipelines, you can use the delayed function. This just wraps standard Python functions so that they are not evaluated until called upon to do so by the scheduler. You can think of delayed as converting an eager function to a lazy one. You generally used delayed when the processing task is not easily doable with any of the array, bag or data frame abstractions, since you have the full power of Python with delayed.
It is easy to convert to and from delayed with the array, bag or data
frame parallel data structures using the to_delayed()
and
from_delayted()
methods.
We will show the simple example provided in the dask
documentation.
def inc(x): return x + 1
def double(x): return x + 2
def add(x, y): return x + y
data = [1, 2, 3, 4, 5]
output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c)
total = delayed(sum)(output)
total.compute()
The DAG (directed acyclic graph) of tasks built by dask¶
Using this graph, the scheduler can identify opportunities for parallelism.
total.visualize()
Minimizing computation of intermediate objects¶
x = da.random.randint(0, 5, 10, chunks=(5,))
y = (x + 1).sum()
z = (x + 1).mean()
da.compute(y, z)