Hadoop is a framework for distributed programming that handles failures transparently and provides a way to robuslty code programs for execution on a cluster. The main modules are
There is also an ecosystem of tools with very whimsical names built upon
the Hadoop framework, and this ecosystem can be bewildering. We will
minly look at distributed compuitng alternatives to MapReduce that can
run on HDFS - spefically Spark
and Impala
. Also of interest is
, a parallel machine learing library built on top of
and spark
See the official documnetation here
The simplest way to try out the Hadoop system is probbaly to install the Cloudera Virtual Machine image or to use Amazon Elastic MapRedcue. If you install from scratch, there are some confiugration steps to overcome. The following example assumes that Hadoop has been installed locally and the path to Hadoop executables has been exported.
from __future__ import division
import os
import sys
import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
%matplotlib inline
%precision 4
! pip install toolz
from toolz import countby, groupby, accumulate, reduce, compose, partition
from operator import add, itemgetter
x = range(10)
map(lambda x: x*x, filter(lambda x: x%2==0, x))
[0, 4, 16, 36, 64]
flatmap = compose(concat, map)
from string import split
s = ["hello world", "this is the end"]
print list(map(split, s))
print list(flatmap(split, s))
[['hello', 'world'], ['this', 'is', 'the', 'end']]
['hello', 'world', 'this', 'is', 'the', 'end']
s = 'aabaabcdeda'
a = [(_, 1) for _ in s]
print a
[('a', 1), ('a', 1), ('b', 1), ('a', 1), ('a', 1), ('b', 1), ('c', 1), ('d', 1), ('e', 1), ('d', 1), ('a', 1)]
[item[0] for item in g.itervalues()]
[('a', 1), ('c', 1), ('b', 1), ('e', 1), ('d', 1)]
groupby(itemgetter(0), a, )
{'a': [('a', 1), ('a', 1), ('a', 1), ('a', 1), ('a', 1)],
'b': [('b', 1), ('b', 1)],
'c': [('c', 1)],
'd': [('d', 1), ('d', 1)],
'e': [('e', 1)]}
countby(itemgetter(0), a)
{'a': 5, 'b': 2, 'c': 1, 'd': 2, 'e': 1}
A Hadoop job consists of the input file(s) on HDFS, \(m\) map tasks and \(n\) reduce tasks, and the output is \(n\) files. The stages of one map-reduce iteration are:
At each such iteration, there is input read in from HDFS and given to the mapper, and output written out to HDFS by the reducer. Optimizing the MapReduce pipeline often consists of minimizing the I/O tranfers.
s = 'aabaabcdeda'
xs = map(lambda x: [x, 1], s)
[['a', 1],
['a', 1],
['b', 1],
['a', 1],
['a', 1],
['b', 1],
['c', 1],
['d', 1],
['e', 1],
['d', 1],
['a', 1]]
xs = sorted(xs)
ys = []
seen = []
for x in xs:
if x[0] not in seen:
ys.append([x[0], [x[1]]])
[['a', [1, 1, 1, 1, 1]], ['b', [1, 1]], ['c', [1]], ['d', [1, 1]], ['e', [1]]]
We want to count the number of times each word occurs in a set of books. We will do this in Python.
This will generate a lot of chatter
hdfs dfs -mkdir /user
hdfs dfs -mkdir /user/cliburn
hadoop dfs -copyFromLocal books /user/cliburn/books
%%file mapper.py
#!/usr/bin/env python
import sys
def read_input(file):
for line in file:
yield line.split()
def main(sep='\t'):
data = read_input(sys.stdin)
for words in data:
for word in words:
print '%s%s%d' % (word, sep, 1)
if __name__ == '__main__':
%%file reducer.py
#!/usr/bin/env python
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, sep):
for line in file:
yield line.rstrip().split(sep, 1)
def main(sep='\t'):
data = read_mapper_output(sys.stdin, sep=sep)
for word, group in groupby(data, itemgetter(0)):
total_count = sum(int(count) for word, count in group)
print '%s%s%d' % (word, sep, total_count)
if __name__ == '__main__':
! chmod +x maper.py
! chmod +x reducer.py
The native language for Hadoop is Java, but Hadoop stremaing allows custom prograsm in other langauges to write the mapper, combiner and reducer functions. For full set of options, see http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html
hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-*streaming*.jar \
-file ./mapper.py -mapper ./mapper.py \
-file ./reducer.py -reducer ./reducer.py \
-input /user/cliburn/books/* -output /user/cliburn/books-output
hdfs dfs -ls /user/cliburn/books-output
hdfs dfs -cat /user/cliburn/books-output/part-00000
./sbin/stop-yarn.sh ./sbin/stop-dfs.sh
The Python module mrjob removes a lot of the boilerplate and can also send jobs to Amazon’s implemtation of Hadoop known as Elastic Map Reduce (EMR).
! pip install mrjob
%%file word_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
As a single Python process for debugging
python word_count.py books/*
To run on Hadoop cluster
python word_count.py -r hadoop books/*
To run on Amazon EMR using files on S3
python word_count.py -r emr s3://<path_to_books>
For comparison, here is the first Java version from the official tutorial:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
context.write(word, one);
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
context.write(key, result);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
Most Hadoop work flows are organized as several rounds of map/reduce -
this is known as job chaining. Because I/O is so expensive, chain
folding where jobs are rearranged to minimize inputs/outputs and job
merging where unrelated jobs using the same dataset are run togtether
are common. In mrjob
, job chaining is performed via the steps
There are several common patterns that are repeatedly used in Hadoop MapReduce programs:
While it is certinly possible, it will take a lot of work to code, debug
and optimize any non-trivial program using just MapReduce construct, for
example regularized logistic regression on a large data set. Hence, we
will switch our focus to more modern tools such as Spark
that provide higher level abstractions and often greater
Spark provides a much richer set of programming constructs and libraries that greatly simplify concurrent programming. In addition, because Spark data can be persistent over a session (unliike MapReduce which reads/writes data at each step in the job chain), it can be much faster for iteratvie programs and also enables interactive concurrent programming. See official documenttion for details, including setting up on Amazon. This article on how to set up Spark on EMR may also be helpful.
Very conceniently for learning, Spark provides an REPL shell where you can interactively type and run Spark programs. For example, this will open a Spark shell as an IPython Notebook (if spark is installed and pyspark is on your path):
IPYTHON_OPTS="notebook" pyspark
To whet your appetite, here is the stadnalone Spark version for the word count program.
%%file spark_count.py
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Word Count")
sc = SparkContext(conf = conf)
rdd = sc.textFile("<path_to_books>")
words = rdd.flatMap(lambda x: x.split())
result = words.countByValue()
And this is run by typing on the command line
bin/spark-submit spark_count.py
Of course, spark-submit
has many options that can be provided to
configure the job.