My interest in Hadoop had been sparked a year ago by the following headline: Up to 100x faster than Hadoop MapReduce. Now, its finally time to explore Apache Spark.
MapReduce expects you to write two programs, a mapper and a reducer. The MapReduce system will try to run the mapper programs on nodes close to the data. The output of various mapper programs will be key value pairs. The MapReduce system will forward the output to various reducer programs, based on the key.
In the Spark environment, you write only one set of code containing both the mapper and reducer code. The framework will distribute and execute the code in a manner that will try to optimise the performance and minimise the movement of data over the network. Besides, the program could include additional mappers and reducers to process the intermediate results.
Installing Spark
You will need to download the code from the Apache Spark site (http://spark.apache.org/) based on the version of Hadoop on your system. Installation involves just unzipping the downloaded file. The documentation will guide you on how to configure it for a cluster, e.g., consisting of three nodes h-mstr, h-slv1 and h-slv2.
Getting a taste of Spark with word count
Lets suppose that you have already created and copied text files in the Hadoop Distributed File System (HDFS) in the directory /user/fedora/docs/.
Start the Python Spark shell, as follows:
$ bin/pyspark
Open the text files in HDFS and run the usual word count example:
>>> data = sc.textFile('hdfs://h_mstr/user/fedora/docs') >>> result = data.flatMap(lambda line:re.sub('[^a-z0-9]+',' ',line.lower()).split()) .map(lambda x:(x,1)) .reduceByKey(lambda a,b:a+b) >>> output=result.collect() >>> output.sort() >>> for w,c in output: >>> print('%s %d'%(w,c))
The shell creates a Spark context, sc. Use it to open the files contained in the HDFS docs directory for the user Fedora.
You use flatMap to split each line into words. In this case, you have converted each line into lower case and replaced all non-alphanumeric characters by spaces, before splitting it into words. Next, map each word into a pair (word, 1).
The mapping is now complete and you can reduce it by keyword, accumulating the count of each word. So far, the response would have been very fast. Spark is lazy and evaluates only when needed, which will be done when you run the collect function. It will return a list of word count pairs, which you may sort and print.
Finding out which files contain a particular word
Lets assume that you have a large number of small text files and you would like to know how frequently a particular word occurs in various files.
Spark has the option to process the whole file rather than one record at a time. Each file is treated as a pair of values the file name and the content.
from pyspark import SparkContext import re # input to wholeFile is a file name, file content pair def wholeFile(x): name=x[0] words = re.sub('[^a-z0-9]+',' ',x[1].lower()).split() return [(word,name) for word in set(words)] sc = SparkContext(appName='WordsInFiles') data = sc.wholeTextFiles('hdfs://h_mstr/user/fedora/docs') word_index = data.flatMap(wholeFile).reduceByKey(lambda a,b:','.join((a,b))) word_index.persist() output=word_index.collect() output.sort() # print the first 10 values for i in range(10): print output[i] explore(word_index) sc.stop()
The function wholeFile returns a list of (word, file name) pairs for unique words. As earlier, the text has been converted to lower case and all special characters replaced by a space.
The reduceByKey function joins all the file names with a comma as the separator. For verification, the first 10 results of the sorted output are printed.
The persist method saves the result of word_index, which is useful if you want to use the same data set again. For example, in the explore method mentioned above, you may want to analyse a list of words and see how many of these words appear in a file.
class filter_keys(): def __init__(self,keys): self.keys = keys def filter(self,x): return x[0] in self.keys def explore_keys(rdd,keys): example = filter_keys(keys) res = rdd.filter(example.filter) \ .flatMap(lambda x:x[1].split(',')) \ .map(lambda x:(x,1)) \ .reduceByKey(lambda a,b:a+b) out=res.collect() out.sort(lambda a,b:-a[1].__cmp__(b[1])) print keys,len(out) print out def explore(rdd): explore_keys(rdd,['linux','fedora','ubuntu']) explore_keys(rdd,['linux','fedora','ubuntu','python'])
The first step is to select only those records that pertain to the keywords. The Filter operation on the data set will do just that. However, we need to pass a list of words as a parameter and this can be achieved by creating the class filter_keys. The class is initialised with a list of keywords and the filter method in the class can use this list.
The next step is like the word count example. Each line is a word followed by a string of comma-separated file names. So, split the file names into a list and count the number of times each file is selected.
Sort the results in descending order (of the frequency) and print them. You can repeat the exercise for various keywords and the intermediate data set will be reused without the need to save it in a file. This is one of the major advantages of Spark over MapReduce.
From a developers perspective, the ability to code the entire analysis (which may consist of a sequence of MapReduce operations) in a single file makes it much easier to think through the problem and write better code.