为了正常的体验网站,请在浏览器设置里面开启Javascript功能!

Pydoop-- a Python MapReduce and HDFS API for Hadoop

2012-09-12 7页 pdf 152KB 120阅读

用户头像

is_502581

暂无简介

举报
Pydoop-- a Python MapReduce and HDFS API for Hadoop Pydoop: a Python MapReduce and HDFS API for Hadoop Simone Leo CRS4 Pula, CA, Italy simone.leo@crs4.it Gianluigi Zanetti CRS4 Pula, CA, Italy gianluigi.zanetti@crs4.it ABSTRACT MapReduce has become increasingly popular as a simple and efficient paradigm for lar...
Pydoop-- a Python MapReduce and HDFS API for Hadoop
Pydoop: a Python MapReduce and HDFS API for Hadoop Simone Leo CRS4 Pula, CA, Italy simone.leo@crs4.it Gianluigi Zanetti CRS4 Pula, CA, Italy gianluigi.zanetti@crs4.it ABSTRACT MapReduce has become increasingly popular as a simple and efficient paradigm for large-scale data processing. One of the main reasons for its popularity is the availability of a production-level open source implementation, Hadoop, writ- ten in Java. There is considerable interest, however, in tools that enable Python programmers to access the framework, due to the language’s high popularity. Here we present a Python package that provides an API for both the MapRe- duce and the distributed file system sections of Hadoop, and show its advantages with respect to the other available solu- tions for Hadoop Python programming, Jython and Hadoop Streaming. Categories and Subject Descriptors D.3.3 [Programming Languages]: Language Constructs and Features—Modules, packages 1. INTRODUCTION In the past few years, MapReduce [16] has become increas- ingly popular, both commercially and academically, as a simple and efficient paradigm for large-scale data process- ing. One of the main reasons for its popularity is the avail- ability of a production-level open source implementation, Hadoop [5], which also includes a distributed file system, HDFS, inspired by the Google File System [17]. Hadoop, a top-level Apache project, scales up to thousands of comput- ing nodes and is able to store and process data in the order of the petabytes. It is widely used across a large number of organizations [2], most notably Yahoo, which is also the largest contributor [7]. It is also included as a feature by cloud computing environments such as Amazon Web Ser- vices [1]. Hadoop is fully written in Java, and provides Java APIs to interact with both MapReduce and HDFS. However, pro- grammers are not limited to Java for application writing. The Hadoop Streaming library (included in the Hadoop dis- tribution), for instance, allows the user to provide the map and reduce logic through executable scripts, according to a fixed text protocol. This, in principle, allows to write an application using any programming language, but with re- strictions both for what concerns the set of available features and the format of the data to be processed. Hadoop also in- cludes a C library to interact with HDFS (interfaced with the Java code through JNI) and a C++ MapReduce API which takes advantage of the Hadoop Pipes package. Pipes splits the application-specific C++ code into a separate pro- cess, exchanging serialized objects with the framework via a socket. As a consequence, a C++ application is able to process any type of input data and has access to a large sub- set of the MapReduce components. Hadoop’s standard dis- tribution also includes Python examples that are, however, meant to be run on Jython [13], a Java implementation of the language which has several limitations compared to the official C implementation (CPython). In this work, we focused on building Python APIs for both the MapReduce and the HDFS parts of Hadoop. Python is an extremely popular programming language characterized by very high level data structures and good performances. Its huge standard library, complemented by countless third- party packages, allows it to handle practically any applica- tion domain. Being able to write MapReduce applications in Python constitutes, therefore, a major advantage for any organization that focuses on rapid application development, especially when there is a consistent amount of internally developed libraries that could be reused. The standard and probably most common way to develop Python Hadoop programs is to either take advantage of Hadoop Streaming or use Jython. In Hadoop Streaming, an executable Python script can act as the mapper or reducer, interacting with the framework through standard input and standard output. The communication protocol is a simple text-based one, with new line characters as record delimiters and tabs as key/value separators. Therefore, it cannot pro- cess arbitrary data streams, and the user directly controls only the map and reduce parts (i.e., you can’t write a Recor- dReader or Partitioner). Jython is a Java implementation of the Python language, which allows a Python programmer to import and use Java packages. This, of course, allows access to the same features available to Java. However, this comes at a cost to the Python programmer: Jython is typ- ically, at any given time, one or more releases older than CPython, it does not implement the full range of standard Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission and/or a fee. HPDC'10, June 20–25, 2010, Chicago, Illinois, USA. Copyright 2010 ACM 978-1-60558-942-8/10/06 ...$10.00. 819 library modules and does not allow to use modules writ- ten as C/C++ extensions. Usability of existing Python li- braries for Hadoop with Jython is therefore only possible if they meet the above restrictions. Moreover, the majority of publicly available third-party packages are not compati- ble with Jython, most notably the numerical computation libraries (e.g., numpy [10]) that constitute an indispensable complement to Python for scientific programming. Our main goal was to provide access to as many as possi- ble of the Hadoop features available to Java while allowing compact CPython code development with little impact on performances. Since the Pipes/C++ API seemed to meet these requirements well, we developed a Python package, Pydoop, by wrapping its C++ code with Boost.Python [15]. We also wrapped the C libhdfs code to made HDFS opera- tions available to Python. To evaluate the package in terms of performances, we ran a series of tests, purposely running a very simple application in order to focus on interaction with the Java core framework (any application with a nontrivial amount of computation performed inside the mapper and/or reducer would likely run faster if written in C++, indepen- dently of how it interfaces with the framework), where we compare Pydoop with the other two solutions for Python ap- plication writing in Hadoop, Jython and Hadoop Streaming (with Python scripts), and also to Java and C++ to give a general idea of how much performance loss is to be ex- pected when choosing Python as the programming language for Hadoop. Pydoop is currently being used in production for bioinformatics applications and other purposes [18]. It currently supports Python 2.5 and Python 2.6. The rest of the paper is organized as follows. After dis- cussing related work, we introduce Pydoop’s architecture and features; we then present a performance-wise compar- ison of our implementation with the other ones; finally, we give our conclusions and plans for future work. 2. RELATED WORK Efforts aimed at easing Python programming on Hadoop in- clude Happy [6] and Dumbo [4]. These frameworks, rather than providing a Java-like Python API for Hadoop, are focused on building high-level wrappers that hide all job creation and submission details from the user. As far as MapReduce programming is concerned, they are built upon, respectively, Jython and Hadoop Streaming, and thus they suffer from the same limitations. Existing non-Java HDFS APIs use Thrift [14] to make HDFS calls available to other languages [8] (including Python) by instantiating a Thrift server that acts as a gateway to HDFS. In contrast, Pydoop’s HDFS module, being built as a wrap- per around the C libhdfs code, is specific to Python but does not require a server to communicate with HDFS. Finally, there are several non-Hadoop Mapreduce implemen- tations available to write applications in languages other than Java. These include Starfish [12] (Ruby), Octopy [11] (Python) and Disco [3] (Python, framework written in Er- lang). Since Hadoop is still the most popular and widely deployed implementation, however, we expect our Python bindings to be useful for the majority of MapReduce pro- grammers. Figure 1: Hadoop pipes data flows. Hadoop commu- nicates with user-supplied executables by means of a specialized protocol. Almost all components can be delegated to the user-supplied executables, but, at a minimum, it is necessary to provide the Mapper and Reducer classes. 3. ARCHITECTURE 3.1 MapReduce Jobs in Hadoop MapReduce [16] is a distributed computing paradigm tai- lored for the analysis of very large datasets. It operates on a input key/value pair stream that is converted into a set of intermediate key/value pairs by a user-defined map func- tion; the user also provides a reduce function that merges together all intermediate values associated to a given key to yield the final result. In Hadoop, MapReduce is im- plemented according to a master-slave model: the master, which performs task dispatching and overall job monitoring, is called Job Tracker; the slaves, which perform the actual work, are called Task Trackers. A client launching a job first creates a JobConf object whose role is to set required (Mapper and Reducer classes, in- put and output paths) and optional job parameters (e.g., the number of mappers and/or reducers). The JobConf is passed on to the JobClient, which divides input data into InputSplits and sends job data to the Job Tracker. Task Trackers periodically contact the Job Tracker for work and launch tasks in separate Java processes. Feedback is pro- vided in two ways: by incrementing a counter associated to some application-related variable or by sending a status re- port (an arbitrary text message). The RecordReader class is responsible for reading InputSplits and provide a record- oriented view to the Mapper. Users can optionally write their own RecordReader (the default one yields a stream of text lines). Other key components are the Partitioner, which assigns key/value pairs to reducers, and the RecordWriter, which writes output data to files. Again, these are option- ally written by the users (the defaults are, respectively, to partition based on a hash value of the key and to write tab- separated output key/value pairs, one per line). 820 Figure 2: Integration of Pydoop with C++. In Pipes, method calls flow from the framework through the C++ and the Pydoop API, ultimately reaching user-defined classes; Python objects are wrapped by Boost.Python and returned to the framework. In the HDFS wrapper, instead, func- tion calls are initiated by Pydoop. 3.2 Hadoop Pipes Fig. 1 shows data flows in Hadoop Pipes. Hadoop uses a specialized class of tasks, Pipes tasks, to communicate with user-supplied executables by means of a protocol that uses persistent socket connections to exchange serialized objects. The C++ application provides a factory that is used by the framework to create the various components it needs (Map- per, Reducer, RecordReader, Partitioner . . . ). Almost all Hadoop framework components can be overridden by C++ implementations, but, at a minimum, the factory should pro- vide Mapper and Reducer object creation. Fig. 2 shows integration of Pydoop with the C++ code. In the Pipes wrapper, method calls flow from the frame- work through the C++ code and the Pydoop API, ulti- mately reaching user-defined classes; Python objects result- ing from these calls are returned to the framework wrapped in Boost.Python structures. In the HDFS wrapper, the con- trol flow is inverted: function calls are initiated by Pydoop and translated into their C equivalent by the Boost.Python wrapper; resulting objects are wrapped back and presented as Python objects to the application level. 4. FEATURES Pydoop allows to develop full-fledged MapReduce applica- tions with HDFS access. Its key features are: access to most Mapreduce application components: Mapper, Reducer, Recor- dReader, RecordWriter, Partitioner; access to context ob- ject passed by the framework that allow to get JobConf parameters, set counters and report status; programming style similar to the Java and C++ APIs: developers define classes that are instantiated by the framework, with methods also called by the framework (compare this to the Stream- ing approach, where you have to manually handle the entire key/value stream); CPython implementation: any Python module can be used, either pure Python or C/C++ exten- sion (this is not possible with Jython); HDFS access from Python. >>> import os >>> from pydoop.hdfs import hdfs >>> fs = hdfs("localhost", 9000) >>> fs.open_file("f",os.O_WRONLY).write(open("f").read()) Figure 3: A compact HDFS usage example. In this case, a local file is copied to HDFS. from pydoop.hdfs import hdfs MB = float(2**20) def treewalker(fs, root_info): yield root_info if root_info["kind"] == "directory": for info in fs.list_directory(root_info["name"]): for item in treewalker(fs, info): yield item def usage_by_bs(fs, root): stats = {} root_info = fs.get_path_info(root) for info in treewalker(fs, root_info): if info["kind"] == "directory": continue bs = int(info["block_size"]) size = int(info["size"]) stats[bs] = stats.get(bs, 0) + size return stats def main(argv): fs = hdfs("localhost", 9000) root = fs.working_directory() for k, v in usage_by_bs(fs, root).iteritems(): print "%.1f %d" % (k/MB, v) fs.close() Figure 4: A more complex, although somewhat con- trived, HDFS example. Here, a directory tree is walked recursively and statistics on file system us- age by block size are built. 4.1 A Simple HDFS Example One of the strengths of Python is interactivity. Fig. 3 shows an almost one-liner that copies a file from the local file sys- tem to HDFS. Of course, in this case the Hadoop HDFS shell equivalent would be more compact, but a full API provides more flexibility. The Pydoop HDFS interface is written as a Boost.Python wrapper around the C libhdfs, itself a JNI wrapping of the Java code, so it essentially supports the same array of fea- tures. We also added few extensions, such as a readline method for HDFS files, in order to provide the Python user with an interface that is reasonably close to that of standard Python file objects. Fig. 4 shows a more detailed, even though somewhat con- trived, example of a script that walks through a directory tree and builds statistics of HDFS usage by block size. This is an example of a useful operation that cannot be performed with the Hadoop HDFS shell and requires writing only a small amount of Python code. 821 from pydoop.pipes import Mapper, Reducer, Factory, runTask class WordCountMapper(Mapper): def map(self, context): words = context.getInputValue().split() for w in words: context.emit(w, "1") class WordCountReducer(Reducer): def reduce(self, context): s = 0 while context.nextValue(): s += int(context.getInputValue()) context.emit(context.getInputKey(), str(s)) runTask(Factory(WordCountMapper, WordCountReducer)) Figure 5: The simplest implementation of the classic word count example in Pydoop. 4.2 A Simple MapReduce Example Fig. 5 shows the simplest implementation of the classic word count example in Pydoop. All communication with the framework is handled through the context object. Specifi- cally, through the context, Mapper objects get input key/value pairs (in this case the key, equal to the byte offset of the input file, is not needed) and emit intermediate key/value pairs; reducers get intermediate keys along with their asso- ciated set of values and emit output key/value pairs. Fig. 6 shows how to include counter and status updates in the Mapper and Reducer. Counters are defined by the user: they are usually associated with relevant application param- eters (in this case, the mapper counts input words and the reducer counts output words). Status updates are simply arbitrary text messages that the application reports back to the framework. As shown in the code snippet, communi- cation of counter and status updates happens through the context object. Fig. 7 shows how to implement a Recor- dReader for the word count application. The RecordReader processes the InputSplit, a raw bytes chunk from an input file, and divides it into key/value pairs to be fed to the Map- per. In this example, we show a Python reimplementation of Hadoop’s default RecordReader, where keys are byte offsets with respect to the whole file and values are text lines: this RecordReader is therefore not specific to word count (al- though it is the one that the word count Mapper expects). Note that we are using the readline method we added to Pydoop HDFS file objects. Finally, fig. 8 shows how to implement the RecordWriter and Partitioner for the word count application. Again, these are Python versions of the corresponding standard general- purpose Hadoop components, so they are actually not spe- cific to word count. The RecordWriter is responsible for writing key/value pairs to output files: the standard, which is replicated here, is to write one key/value pair per line, separated by a configurable separator that defaults to the tab character. This example also shows how to use the Job- Conf object to retrieve configuration parameters: in this case we are reading standard Hadoop parameters, but an application is free to define any number of arbitrary options class WordCountMapper(Mapper): def __init__(self, context): super(WordCountMapper, self).__init__(context) context.setStatus("initializing") self.inputWords = context.getCounter(WC, INPUT_WORDS) def map(self, context): k = context.getInputKey() words = context.getInputValue().split() for w in words: context.emit(w, "1") context.incrementCounter(self.inputWords, len(words)) class WordCountReducer(Reducer): def __init__(self, context): super(WordCountReducer, self).__init__(context) context.setStatus("initializing") self.outputWords = context.getCounter(WC, OUTPUT_WORDS) def reduce(self, context): s = 0 while context.nextValue(): s += int(context.getInputValue()) context.emit(context.getInputKey(), str(s)) context.incrementCounter(self.outputWords, 1) Figure 6: A word count implementation that in- cludes counters and status updates: through these, the application writer can send information on its progress to the framework. class WordCountReader(RecordReader): def __init__(self, context): super(WordCountReader, self).__init__() self.isplit = InputSplit(context.getInputSplit()) self.host, self.port, self.fpath = split_hdfs_path( self.isplit.filename) self.fs = hdfs(self.host, self.port) self.file = self.fs.open_file(self.fpath, os.O_RDONLY) self.file.seek(self.isplit.offset) self.bytes_read = 0 if self.isplit.offset > 0: # read by reader of previous split discarded = self.file.readline() self.bytes_read += len(discarded) def next(self): if self.bytes_read > self.isplit.length: return (False, "", "") key = struct.pack(">q", self.isplit.offset+self.bytes_read) record = self.file.readline() if record == "": return (False, "", "") self.bytes_read += len(record) return (True, key, record) def getProgress(self): return min(float(self.bytes_read)/self.isplit.length, 1.0) Figure 7: Word Count RecordReader example. The RecordReader converts the byte-oriented view of the InputSplit to the record-oriented view needed by the Mapper. Here, we show some code snippets from a plug-in replacement of Hadoop’s standard Java LineRecordReader, where keys are byte offsets with respect to the whole file and values (records) are text lines. 822 class WordCountWriter(RecordWriter): def __init__(self, context): super(WordCountWriter, self).__init__(context) jc = context.getJobConf() jc_configure_int(self, jc, "mapred.task.partition", "part") jc_configure(self, jc, "mapred.work.output.dir", "outdir") jc_co
/
本文档为【Pydoop-- a Python MapReduce and HDFS API for Hadoop】,请使用软件OFFICE或WPS软件打开。作品中的文字与图均可以修改和编辑, 图片更改请在作品中右键图片并更换,文字修改请直接点击文字进行修改,也可以新增和删除文档中的内容。
[版权声明] 本站所有资料为用户分享产生,若发现您的权利被侵害,请联系客服邮件isharekefu@iask.cn,我们尽快处理。 本作品所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用。 网站提供的党政主题相关内容(国旗、国徽、党徽..)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。

历史搜索

    清空历史搜索