Pydoop: a Python MapReduce and HDFS API for Hadoop
Simone Leo
CRS4
Pula, CA, Italy
simone.leo@crs4.it
Gianluigi Zanetti
CRS4
Pula, CA, Italy
gianluigi.zanetti@crs4.it
ABSTRACT
MapReduce has become increasingly popular as a simple
and efficient paradigm for large-scale data processing. One
of the main reasons for its popularity is the availability of a
production-level open source implementation, Hadoop, writ-
ten in Java. There is considerable interest, however, in tools
that enable Python programmers to access the framework,
due to the language’s high popularity. Here we present a
Python package that provides an API for both the MapRe-
duce and the distributed file system sections of Hadoop, and
show its advantages with respect to the other available solu-
tions for Hadoop Python programming, Jython and Hadoop
Streaming.
Categories and Subject Descriptors
D.3.3 [Programming Languages]: Language Constructs
and Features—Modules, packages
1. INTRODUCTION
In the past few years, MapReduce [16] has become increas-
ingly popular, both commercially and academically, as a
simple and efficient paradigm for large-scale data process-
ing. One of the main reasons for its popularity is the avail-
ability of a production-level open source implementation,
Hadoop [5], which also includes a distributed file system,
HDFS, inspired by the Google File System [17]. Hadoop, a
top-level Apache project, scales up to thousands of comput-
ing nodes and is able to store and process data in the order
of the petabytes. It is widely used across a large number
of organizations [2], most notably Yahoo, which is also the
largest contributor [7]. It is also included as a feature by
cloud computing environments such as Amazon Web Ser-
vices [1].
Hadoop is fully written in Java, and provides Java APIs to
interact with both MapReduce and HDFS. However, pro-
grammers are not limited to Java for application writing.
The Hadoop Streaming library (included in the Hadoop dis-
tribution), for instance, allows the user to provide the map
and reduce logic through executable scripts, according to a
fixed text protocol. This, in principle, allows to write an
application using any programming language, but with re-
strictions both for what concerns the set of available features
and the format of the data to be processed. Hadoop also in-
cludes a C library to interact with HDFS (interfaced with
the Java code through JNI) and a C++ MapReduce API
which takes advantage of the Hadoop Pipes package. Pipes
splits the application-specific C++ code into a separate pro-
cess, exchanging serialized objects with the framework via
a socket. As a consequence, a C++ application is able to
process any type of input data and has access to a large sub-
set of the MapReduce components. Hadoop’s standard dis-
tribution also includes Python examples that are, however,
meant to be run on Jython [13], a Java implementation of
the language which has several limitations compared to the
official C implementation (CPython).
In this work, we focused on building Python APIs for both
the MapReduce and the HDFS parts of Hadoop. Python is
an extremely popular programming language characterized
by very high level data structures and good performances.
Its huge standard library, complemented by countless third-
party packages, allows it to handle practically any applica-
tion domain. Being able to write MapReduce applications
in Python constitutes, therefore, a major advantage for any
organization that focuses on rapid application development,
especially when there is a consistent amount of internally
developed libraries that could be reused.
The standard and probably most common way to develop
Python Hadoop programs is to either take advantage of
Hadoop Streaming or use Jython. In Hadoop Streaming, an
executable Python script can act as the mapper or reducer,
interacting with the framework through standard input and
standard output. The communication protocol is a simple
text-based one, with new line characters as record delimiters
and tabs as key/value separators. Therefore, it cannot pro-
cess arbitrary data streams, and the user directly controls
only the map and reduce parts (i.e., you can’t write a Recor-
dReader or Partitioner). Jython is a Java implementation
of the Python language, which allows a Python programmer
to import and use Java packages. This, of course, allows
access to the same features available to Java. However, this
comes at a cost to the Python programmer: Jython is typ-
ically, at any given time, one or more releases older than
CPython, it does not implement the full range of standard
Permission to make digital or hard copies of all or part of this work for
personal or classroom use is granted without fee provided that copies are
not made or distributed for profit or commercial advantage and that copies
bear this notice and the full citation on the first page. To copy otherwise, to
republish, to post on servers or to redistribute to lists, requires prior specific
permission and/or a fee.
HPDC'10, June 20–25, 2010, Chicago, Illinois, USA.
Copyright 2010 ACM 978-1-60558-942-8/10/06 ...$10.00.
819
library modules and does not allow to use modules writ-
ten as C/C++ extensions. Usability of existing Python li-
braries for Hadoop with Jython is therefore only possible if
they meet the above restrictions. Moreover, the majority
of publicly available third-party packages are not compati-
ble with Jython, most notably the numerical computation
libraries (e.g., numpy [10]) that constitute an indispensable
complement to Python for scientific programming.
Our main goal was to provide access to as many as possi-
ble of the Hadoop features available to Java while allowing
compact CPython code development with little impact on
performances. Since the Pipes/C++ API seemed to meet
these requirements well, we developed a Python package,
Pydoop, by wrapping its C++ code with Boost.Python [15].
We also wrapped the C libhdfs code to made HDFS opera-
tions available to Python. To evaluate the package in terms
of performances, we ran a series of tests, purposely running a
very simple application in order to focus on interaction with
the Java core framework (any application with a nontrivial
amount of computation performed inside the mapper and/or
reducer would likely run faster if written in C++, indepen-
dently of how it interfaces with the framework), where we
compare Pydoop with the other two solutions for Python ap-
plication writing in Hadoop, Jython and Hadoop Streaming
(with Python scripts), and also to Java and C++ to give
a general idea of how much performance loss is to be ex-
pected when choosing Python as the programming language
for Hadoop. Pydoop is currently being used in production
for bioinformatics applications and other purposes [18]. It
currently supports Python 2.5 and Python 2.6.
The rest of the paper is organized as follows. After dis-
cussing related work, we introduce Pydoop’s architecture
and features; we then present a performance-wise compar-
ison of our implementation with the other ones; finally, we
give our conclusions and plans for future work.
2. RELATED WORK
Efforts aimed at easing Python programming on Hadoop in-
clude Happy [6] and Dumbo [4]. These frameworks, rather
than providing a Java-like Python API for Hadoop, are
focused on building high-level wrappers that hide all job
creation and submission details from the user. As far as
MapReduce programming is concerned, they are built upon,
respectively, Jython and Hadoop Streaming, and thus they
suffer from the same limitations.
Existing non-Java HDFS APIs use Thrift [14] to make HDFS
calls available to other languages [8] (including Python) by
instantiating a Thrift server that acts as a gateway to HDFS.
In contrast, Pydoop’s HDFS module, being built as a wrap-
per around the C libhdfs code, is specific to Python but does
not require a server to communicate with HDFS.
Finally, there are several non-Hadoop Mapreduce implemen-
tations available to write applications in languages other
than Java. These include Starfish [12] (Ruby), Octopy [11]
(Python) and Disco [3] (Python, framework written in Er-
lang). Since Hadoop is still the most popular and widely
deployed implementation, however, we expect our Python
bindings to be useful for the majority of MapReduce pro-
grammers.
Figure 1: Hadoop pipes data flows. Hadoop commu-
nicates with user-supplied executables by means of
a specialized protocol. Almost all components can
be delegated to the user-supplied executables, but,
at a minimum, it is necessary to provide the Mapper
and Reducer classes.
3. ARCHITECTURE
3.1 MapReduce Jobs in Hadoop
MapReduce [16] is a distributed computing paradigm tai-
lored for the analysis of very large datasets. It operates on
a input key/value pair stream that is converted into a set
of intermediate key/value pairs by a user-defined map func-
tion; the user also provides a reduce function that merges
together all intermediate values associated to a given key
to yield the final result. In Hadoop, MapReduce is im-
plemented according to a master-slave model: the master,
which performs task dispatching and overall job monitoring,
is called Job Tracker; the slaves, which perform the actual
work, are called Task Trackers.
A client launching a job first creates a JobConf object whose
role is to set required (Mapper and Reducer classes, in-
put and output paths) and optional job parameters (e.g.,
the number of mappers and/or reducers). The JobConf is
passed on to the JobClient, which divides input data into
InputSplits and sends job data to the Job Tracker. Task
Trackers periodically contact the Job Tracker for work and
launch tasks in separate Java processes. Feedback is pro-
vided in two ways: by incrementing a counter associated to
some application-related variable or by sending a status re-
port (an arbitrary text message). The RecordReader class
is responsible for reading InputSplits and provide a record-
oriented view to the Mapper. Users can optionally write
their own RecordReader (the default one yields a stream of
text lines). Other key components are the Partitioner, which
assigns key/value pairs to reducers, and the RecordWriter,
which writes output data to files. Again, these are option-
ally written by the users (the defaults are, respectively, to
partition based on a hash value of the key and to write tab-
separated output key/value pairs, one per line).
820
Figure 2: Integration of Pydoop with C++. In
Pipes, method calls flow from the framework
through the C++ and the Pydoop API, ultimately
reaching user-defined classes; Python objects are
wrapped by Boost.Python and returned to the
framework. In the HDFS wrapper, instead, func-
tion calls are initiated by Pydoop.
3.2 Hadoop Pipes
Fig. 1 shows data flows in Hadoop Pipes. Hadoop uses a
specialized class of tasks, Pipes tasks, to communicate with
user-supplied executables by means of a protocol that uses
persistent socket connections to exchange serialized objects.
The C++ application provides a factory that is used by the
framework to create the various components it needs (Map-
per, Reducer, RecordReader, Partitioner . . . ). Almost all
Hadoop framework components can be overridden by C++
implementations, but, at a minimum, the factory should pro-
vide Mapper and Reducer object creation.
Fig. 2 shows integration of Pydoop with the C++ code.
In the Pipes wrapper, method calls flow from the frame-
work through the C++ code and the Pydoop API, ulti-
mately reaching user-defined classes; Python objects result-
ing from these calls are returned to the framework wrapped
in Boost.Python structures. In the HDFS wrapper, the con-
trol flow is inverted: function calls are initiated by Pydoop
and translated into their C equivalent by the Boost.Python
wrapper; resulting objects are wrapped back and presented
as Python objects to the application level.
4. FEATURES
Pydoop allows to develop full-fledged MapReduce applica-
tions with HDFS access. Its key features are: access to most
Mapreduce application components: Mapper, Reducer, Recor-
dReader, RecordWriter, Partitioner; access to context ob-
ject passed by the framework that allow to get JobConf
parameters, set counters and report status; programming
style similar to the Java and C++ APIs: developers define
classes that are instantiated by the framework, with methods
also called by the framework (compare this to the Stream-
ing approach, where you have to manually handle the entire
key/value stream); CPython implementation: any Python
module can be used, either pure Python or C/C++ exten-
sion (this is not possible with Jython); HDFS access from
Python.
>>> import os
>>> from pydoop.hdfs import hdfs
>>> fs = hdfs("localhost", 9000)
>>> fs.open_file("f",os.O_WRONLY).write(open("f").read())
Figure 3: A compact HDFS usage example. In this
case, a local file is copied to HDFS.
from pydoop.hdfs import hdfs
MB = float(2**20)
def treewalker(fs, root_info):
yield root_info
if root_info["kind"] == "directory":
for info in fs.list_directory(root_info["name"]):
for item in treewalker(fs, info):
yield item
def usage_by_bs(fs, root):
stats = {}
root_info = fs.get_path_info(root)
for info in treewalker(fs, root_info):
if info["kind"] == "directory":
continue
bs = int(info["block_size"])
size = int(info["size"])
stats[bs] = stats.get(bs, 0) + size
return stats
def main(argv):
fs = hdfs("localhost", 9000)
root = fs.working_directory()
for k, v in usage_by_bs(fs, root).iteritems():
print "%.1f %d" % (k/MB, v)
fs.close()
Figure 4: A more complex, although somewhat con-
trived, HDFS example. Here, a directory tree is
walked recursively and statistics on file system us-
age by block size are built.
4.1 A Simple HDFS Example
One of the strengths of Python is interactivity. Fig. 3 shows
an almost one-liner that copies a file from the local file sys-
tem to HDFS. Of course, in this case the Hadoop HDFS shell
equivalent would be more compact, but a full API provides
more flexibility.
The Pydoop HDFS interface is written as a Boost.Python
wrapper around the C libhdfs, itself a JNI wrapping of the
Java code, so it essentially supports the same array of fea-
tures. We also added few extensions, such as a readline
method for HDFS files, in order to provide the Python user
with an interface that is reasonably close to that of standard
Python file objects.
Fig. 4 shows a more detailed, even though somewhat con-
trived, example of a script that walks through a directory
tree and builds statistics of HDFS usage by block size. This
is an example of a useful operation that cannot be performed
with the Hadoop HDFS shell and requires writing only a
small amount of Python code.
821
from pydoop.pipes import Mapper, Reducer, Factory, runTask
class WordCountMapper(Mapper):
def map(self, context):
words = context.getInputValue().split()
for w in words:
context.emit(w, "1")
class WordCountReducer(Reducer):
def reduce(self, context):
s = 0
while context.nextValue():
s += int(context.getInputValue())
context.emit(context.getInputKey(), str(s))
runTask(Factory(WordCountMapper, WordCountReducer))
Figure 5: The simplest implementation of the classic
word count example in Pydoop.
4.2 A Simple MapReduce Example
Fig. 5 shows the simplest implementation of the classic
word count example in Pydoop. All communication with the
framework is handled through the context object. Specifi-
cally, through the context, Mapper objects get input key/value
pairs (in this case the key, equal to the byte offset of the
input file, is not needed) and emit intermediate key/value
pairs; reducers get intermediate keys along with their asso-
ciated set of values and emit output key/value pairs.
Fig. 6 shows how to include counter and status updates in
the Mapper and Reducer. Counters are defined by the user:
they are usually associated with relevant application param-
eters (in this case, the mapper counts input words and the
reducer counts output words). Status updates are simply
arbitrary text messages that the application reports back to
the framework. As shown in the code snippet, communi-
cation of counter and status updates happens through the
context object. Fig. 7 shows how to implement a Recor-
dReader for the word count application. The RecordReader
processes the InputSplit, a raw bytes chunk from an input
file, and divides it into key/value pairs to be fed to the Map-
per. In this example, we show a Python reimplementation of
Hadoop’s default RecordReader, where keys are byte offsets
with respect to the whole file and values are text lines: this
RecordReader is therefore not specific to word count (al-
though it is the one that the word count Mapper expects).
Note that we are using the readline method we added to
Pydoop HDFS file objects.
Finally, fig. 8 shows how to implement the RecordWriter
and Partitioner for the word count application. Again, these
are Python versions of the corresponding standard general-
purpose Hadoop components, so they are actually not spe-
cific to word count. The RecordWriter is responsible for
writing key/value pairs to output files: the standard, which
is replicated here, is to write one key/value pair per line,
separated by a configurable separator that defaults to the
tab character. This example also shows how to use the Job-
Conf object to retrieve configuration parameters: in this
case we are reading standard Hadoop parameters, but an
application is free to define any number of arbitrary options
class WordCountMapper(Mapper):
def __init__(self, context):
super(WordCountMapper, self).__init__(context)
context.setStatus("initializing")
self.inputWords = context.getCounter(WC, INPUT_WORDS)
def map(self, context):
k = context.getInputKey()
words = context.getInputValue().split()
for w in words:
context.emit(w, "1")
context.incrementCounter(self.inputWords, len(words))
class WordCountReducer(Reducer):
def __init__(self, context):
super(WordCountReducer, self).__init__(context)
context.setStatus("initializing")
self.outputWords = context.getCounter(WC, OUTPUT_WORDS)
def reduce(self, context):
s = 0
while context.nextValue():
s += int(context.getInputValue())
context.emit(context.getInputKey(), str(s))
context.incrementCounter(self.outputWords, 1)
Figure 6: A word count implementation that in-
cludes counters and status updates: through these,
the application writer can send information on its
progress to the framework.
class WordCountReader(RecordReader):
def __init__(self, context):
super(WordCountReader, self).__init__()
self.isplit = InputSplit(context.getInputSplit())
self.host, self.port, self.fpath = split_hdfs_path(
self.isplit.filename)
self.fs = hdfs(self.host, self.port)
self.file = self.fs.open_file(self.fpath, os.O_RDONLY)
self.file.seek(self.isplit.offset)
self.bytes_read = 0
if self.isplit.offset > 0:
# read by reader of previous split
discarded = self.file.readline()
self.bytes_read += len(discarded)
def next(self):
if self.bytes_read > self.isplit.length:
return (False, "", "")
key = struct.pack(">q", self.isplit.offset+self.bytes_read)
record = self.file.readline()
if record == "":
return (False, "", "")
self.bytes_read += len(record)
return (True, key, record)
def getProgress(self):
return min(float(self.bytes_read)/self.isplit.length, 1.0)
Figure 7: Word Count RecordReader example. The
RecordReader converts the byte-oriented view of
the InputSplit to the record-oriented view needed
by the Mapper. Here, we show some code snippets
from a plug-in replacement of Hadoop’s standard
Java LineRecordReader, where keys are byte offsets
with respect to the whole file and values (records)
are text lines.
822
class WordCountWriter(RecordWriter):
def __init__(self, context):
super(WordCountWriter, self).__init__(context)
jc = context.getJobConf()
jc_configure_int(self, jc, "mapred.task.partition", "part")
jc_configure(self, jc, "mapred.work.output.dir", "outdir")
jc_co