MapReduce is a framework, a pattern, and a programming paradigm that allows us to carry out computations over several terabytes of data in a matter of seconds. When it comes to massive-scale architecture and a huge amount of data, with built-in fault tolerance, there’s nothing better than this. But when we come to define MapReduce programming, it is basically just a combination of two functions — a map function and a reduce function. This shows not just the amount of simplicity exposed by the framework in terms of the efforts of the programmer, but also the sheer power and flexibility of the code that runs under the hood.
It was Google that first introduced MapReduce as a framework. It is used to build indexes for Google Web searches! It handles many petabytes of data every day, where programs are executed on a large-scale cluster. You might not realise the actual increase in performance when processing a limited amount of data on a limited amount of machines, but if you do dream of becoming that big one day (and of course you do!), then this is the way to go.
However, this should not push you into thinking that MapReduce is effective only for large datasets and data-intensive computations; what can be more important here is that programmers without any experience with parallel and distributed systems can easily use distributed resources, with the help of MapReduce programming.
Here I would like to point out that there is some difference between distributed computing and parallel computing. Although parallel computing is more of a modified form of distributed computing, parallel computing generally refers to a large number of processors sharing the same memory or the same disks, while distributed computing is increasingly referred to as a cluster of nodes, where each node is an independent unit with its own memory and disk.
These could be any computer or laptop that you find lying around you. This is why the days of dedicated high-performance supercomputers are gone. These days, the computational power of carefully tuned and configured clusters of such computers can easily match, or even exceed, the performance of several supercomputers that you might have heard of. Another advantage of using such clusters is that they have a linear scalability curve, and you don’t have to go about buying a bigger and better server to get increased performance.
Use cases
MapReduce is a good fit for problems that can easily be divided into a number of smaller pieces, which can thus be solved independently. The data is ideally (but not necessarily) in the form of lists, or just a huge chunk of raw information waiting to be processed — be it log files, geospatial data, genetic data to be used in biochemistry, or (of course!) Web pages to be indexed in search engines! Whatever it might be, it is a necessary condition that the final output must not depend on the order in which data is processed.
The use of MapReduce is on the rise in Web analytics, data mining, and various other housekeeping functions in combination with other forms of databases. It is also being used in complex fields ranging from graphics processing in Nvidia’s GPUs, to animation and Machine Learning algorithms. I hope all of you have seen the movie Shrek. Guess what? A distributed MapReduce framework was used to render the output frames of the animated movie!
Also, Amazon Elastic MapReduce has allowed this paradigm to gain much more exposure than ever before, and made it a one-click process. Amazon hosts the Hadoop framework running on Amazon EC2 and Amazon S3. It has allowed MapReduce computations without the headache of setting up servers and clusters, making them available as a resource that can be easily rented out and charged for, on an hourly basis.
How to think in terms of ‘MapReduce’
MapReduce borrows heavily from the languages of the functional programming model, like Lisp, etc., which are focused on processing lists. So, I suggest you get your concepts in functional programming clear before trying your hand at MapReduce.
Although MapReduce gives programmers with no experience in distributed systems an easy interface, the programmer does have to keep in mind the bandwidth considerations in a cluster, and the amount of data that is being passed around. Carefully implemented MapReduce algorithms can go a long way in improving the performance of a particular cluster. Also, all the computations performed in a MapReduce operation are a batch process, as opposed to SQL, which has an interactive query-like interface. While solving a problem using MapReduce, it is obvious that the problem has to be divided into two functions, i.e., map and reduce:
- The map function inputs a series of data streams and processes all the values that follow in the sequence. It takes the initial set of key-value pairs, and in turn, produces an intermediate pair to be passed on to the reducer.
- The reduce function typically combines all the elements of processed data generated by the mappers. Its job is mainly to take a set of intermediate key-value pairs and output a key-value pair that is basically an aggregate of all the values received by it from the mapper.
Combiner functions are sometimes used to combine data on the mapper node, before it goes to the reducer. Mostly, the code used to apply a combiner and a reducer function is the same. This allows us to save a lot of data-transfer bandwidth, and can improve efficiency noticeably. But this does not mean that we should be implementing combiners in every case, since if there is not much data to combine, it can take up unnecessary processing power that could be used in a better manner.
There are no limitations on the variety of content that can get passed into a map or a reduce function. It can be anything ranging from simple strings to complex data, to which you can, in turn, again apply MapReduce. Apparently, MapReduce is based upon the popular map/reduce construct from functional programming, but goes into describing the methodology in a much more parallelised fashion.
Also, a lot of work goes into matters such as the various methods of distributing the data among multiple nodes. So, the most difficult part is not about implementing the functions and writing code; it’s about how you carve out and shape your data.
A (very) short tutorial
Hadoop is one of the most popular implementations (and it’s open source, too) of this framework that was introduced by Google. There is also a project called Disco, which looks interesting to me, since the core of this project has been implemented in Erlang, along with some bits and pieces in Python. This should make it perform better than Hadoop, which is written in Java. Also, Hadoop tends to seem a bit more complex (and too massive for a framework, being a little more than 70 MB for the newer version!) than it should be for most users, so I suggest you take a look at the Disco project if you are a Python developer.
Even so, we will still examine Hadoop, since it is one of the most actively developed, right now. I will assume that you have already set up Hadoop, preferably version 0.21.0 in pseudo-distributed mode, if you don’t have access to a cluster. Since the Hadoop website already has in-depth documentation for installing it, there is no point in repeating that here. Alternatively, I would recommend using Cloudera’s distribution, which provides Deb and RPM packages for easy installation on major Linux distributions. It even provides preconfigured VMWare images for download. Hadoop provides three basic ways of developing MapReduce applications:
- Java API
- Hadoop streaming
- Hadoop pipes
Java API
This is the native Java API that Hadoop provides, which is available as the org.apache.hadoop.mapreduce
package in the new implementation, and the org.apache.hadoop.mapred
as the older version. Instead of devoting a full-fledged step-by-step tutorial to developing a Java program with this API, let’s just try to run some of the excellent examples already bundled with Hadoop.
The example we are going to take up is the distributed version of the grep
command, which takes a text file as the input, and returns the number of occurrences of the specified word in the input file. You can take a look at the source code of the example, which can be found in the HADOOP_HOME/mapred/src/examples/org/apache/examples/Grep.java
file, where HADOOP_HOME
is a variable specifying the directory where you have installed Hadoop on your system.
First of all, let’s copy a text file to be searched, from the local filesystem, into HDFS, using the following command in Hadoop’s home directory:
bin/hadoop fs -copyFromLocal path/on/local/filesystem /DFS/path/to/text/file
Now that we have our inputs ready, we can run the MapReduce example using the following command:
bin/hadoop jar hadoop-mapred-examples-0.21.0.jar grep /DFS/path/to/input/file /DFS/path/to/output/Directory keyword
Here, keyword
is any word or regular expression that you are trying to find in the text file. After a lot of verbose text scrolling by, when your job is complete, you can take a look at the output:
bin/hadoop fs -cat /DFS/path/to/output/file
The output should look something like what’s shown below, i.e., the keyword accompanied by the number of occurrences:
2 keyword
There are a number of other examples that are available, including the classic word count, which is almost synonymous with MapReduce tutorials on the Web. HBase also provides an API in Java for easier access to the HBase tables. These classes are available in the org.apache.hadoop.hbase.mapreduce
package. An excellent tutorial on HBase and MapReduce can be found at here; go through it if you’re interested.
Hadoop streaming
The Streaming utility allows you to program with any application by receiving input at the standard input and giving output at the standard output. Here, any executable can be specified as the mapper and reducer, as long as it is able to receive and send output from/to the standard interface. For example, the following command in Hadoop’s home directory will send a MapReduce job for submission using the streaming utility:
bin/hadoop jar $HADOOP_HOME/mapred/contrib/streaming/hadoop-0.21.0-streaming.jar -input HDFSpathtoInputDirectory \ -output HDFSpathtoOutputDirectory \ -mapper mapperScript \ -reducer reducerScript \ -file AdditionalFile \ -file MoreAdditionalFiles
It is assumed that the mapper and reducer scripts are already present on all the nodes, if you are running a cluster, while the file argument allows you to specify any additional files that need to be sent to those nodes (they can even be mapper and reducer script files themselves, if they are not yet present on all nodes), which are used by the mapper or reducer program.
Hadoop pipes
With Hadoop pipes, you can implement applications that require higher performance in numerical calculations using C++ in MapReduce. The pipes utility works by establishing a persistent socket connection on a port with the Java pipes task on one end, and the external C++ process at the other.
Other dedicated alternatives and implementations are also available, such as Pydoop for Python, and libhdfs
for C. These are mostly built as wrappers, and are JNI-based. It is, however, noticeable that MapReduce tasks are often a smaller component to a larger aspect of chaining, redirecting and recursing MapReduce jobs. This is usually done with the help of higher-level languages or APIs like Pig, Hive and Cascading, which can be used to express such data extraction and transformation problems.
Under the hood
We just saw how easily just a few lines of code can perform computations on a cluster, and distribute them in a manner that would take days for us to code manually. But now, let’s just take a brief look at Hadoop MapReduce’s job flow, and some background on how it all happens.
The story starts when the driver program submits the job configuration to the JobTracker node. As the name suggests, the JobTracker node controls the overall progress of the job, and resides on the namenode of the distributed filesystem, while monitoring individual success or failure of each Task. This JobTracker splits the Job into individual Tasks and submits them to respective TaskTrackers, which mostly reside on the DataNodes themselves, or at least on the same rack on which the data is present.
This makes the HDFS filesystem rack-aware. Now, each TaskTracker receives its share of input data, and starts processing the map function specified by the configuration. When all the Map Tasks are completed, the JobTracker asks the TaskTrackers to start processing the reduce function. A JobTracker deals with failed and unresponsive tasks by running backup tasks, i.e., multiple copies of the same task at different nodes. Whichever node completes first, gets its output accepted. When both the Map and Reduce tasks are completed, the JobTracker notifies the client program, and dumps the output into the specified output directory.
The job flow, as a result, is made up of four main components:
- The Driver Program
- The JobTracker
- The TaskTracker
- The Distributed Filesystem
Limitations
I have been trumpeting the goodness of MapReduce for quite a bit of time, so it becomes essential to spend some time criticising it. MapReduce, apart from being touted as the best thing to happen to cloud computing, has also been criticised a lot:
- Data-transfer bandwidths between different nodes put a cap on the amount of data that can be passed around in the cluster.
- Not all types of problems can be handled by MapReduce, since a fundamental requirement for a problem is internal independence of data, and we cannot exactly determine or control the number or order of Map and Reduce tasks.
- A major (the most important) problem in this paradigm is the barrier between the Map phase and Reduce phase, since processing cannot go into the Reduce phase until all the map tasks have completed.
Last-minute pointers
Before leaving you with your own thoughts, here are a few pointers:
- The HDFS filesystem can also be accessed using the Namenode’s Web interface, at
http://localhost:50070/
. - File data can also be streamed from Datanodes using HTTP, on port 50075.
- Job progress can be determined using the Map-reduce Web Administration console of the JobTracker node, which can be accessed at
http://localhost:50030/
. - The
jps
command can be used to determine the ports at which Hadoop is active and listening. - The thrift interface is also an alternative for binding and developing with languages other than Java.
Further reading
- Apache Hadoop Official website
- Disco Project
- Cloudera’s free Hadoop Training lectures
- Google’s lectures and slides on Cluster Computing and Mapreduce
- Google’s MapReduce Research Paper
- MapReduce: A major step backwards [PDF]
Reduce complications by mapping complexities!