Hadoop is a large scale open source storage and processing framework for data sets. This article builds upon the previous article in this series, Exploring Big Data on a Desktop: Getting Started with Hadoop that appeared in last months issue of OSFY. The focus, now, is on putting data into the system.
Once you have the basics of Hadoop functioning, the obvious question is: how does one put the data into the system? And the corollary to that is: how does one distribute the data between the various data nodes? Since the system can grow to thousands of data nodes, it is pretty obvious that you should not have to tell HDFS anything more than the bare minimum.
A simple test
Your first test can be to take a large text file and load it into HDFS. Start the Hadoop cluster as described in the previous article, with three nodes. On the first test, stop the datanode service on h-mstr and copy a large file on two slave nodes. (To create a text file for testing, take a hexdump of a 1GB tar you should get a text file of around 3GB.)
$ ssh fedora@h-mstr $ sudo systemctl stop hadoop-datanode $ hdfs dfs -copyFromLocal large_file.txt
You can verify that the data has been copied and how much resides on each node, as follows:
$ hadoop fs -ls -h large_file.txt Found 1 items -rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:25 large_file.txt $ sudo du -sh /var/cache/hadoop-hdfs/ 13M /var/cache/hadoop-hdfs/ $ ssh -t fedora@h-slv1 du -sh /var/cache/hadoop-hdfs/ 1.7G /var/cache/hadoop-hdfs/ Connection to h-slv1 closed. $ ssh -t fedora@h-slv2 du -sh /var/cache/hadoop-hdfs/ [fedora@h-mstr ~]$ ssh -t fedora@h-slv2 sudo du -sh /var/cache/hadoop-hdfs/ 1.7G /var/cache/hadoop-hdfs/ Connection to h-slv2 closed.
The data file is distributed on h-slv1 and h-slv2. Now, start the datanode on h-mstr as well. Run the above commands again, as follows:
$ sudo systemctl start hadoop-datanode $ hdfs dfs -copyFromLocal large_file.txt second_large_file.txt $ hadoop fs -ls -h -rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:25 large_file.txt -rw-r--r-- 1 fedora supergroup 3.3 G 2014-08-15 12:33 second_large_file.txt $ sudo du -sh /var/cache/hadoop-hdfs/ [fedora@h-mstr ~]$ sudo du -sh /var/cache/hadoop-hdfs/ 3.4G /var/cache/hadoop-hdfs/ $ ssh -t fedora@h-slv1 du -sh /var/cache/hadoop-hdfs/ 1.7G /var/cache/hadoop-hdfs/ Connection to h-slv1 closed. [fedora@h-mstr ~]$ ssh -t fedora@h-slv2 sudo du -sh /var/cache/hadoop-hdfs/ 1.7G /var/cache/hadoop-hdfs/ Connection to h-slv2 closed.
The new data seems to be stored on the h-mstr only. If you run the command from h-slv2, the data is likely to be stored on the h-slv2 datanode only. That is, HDFS will try to optimise and store the data close to the origin. You can find out about the status of HDFS, including the location of data blocks, by browsing http://h-mstr:50070/.
The file you want to put in HDFS may be on the desktop. You can, of course, access that file using nfs. However, if you have installed the HDFS binaries on your desktop, you can copy the file by running the following command on the desktop:
$ HADOOP_USER_NAME=fedora hdfs dfs -fs hdfs://h-mstr/ -put desktop_file.txt
The commands -put and -copyFromLocal are synonyms.
Replication
You should stop the Hadoop services on all the nodes. In /etc/hadoop/hdfs-site.xml, on each of the nodes, change the value of dfs.replication from 1 to 2 and the data blocks will be replicated on one additional datanode.
<property> <name>dfs.replication</name> <value>2</value> </property>
Restart the Hadoop services. Increase the replication for existing files and explore http://h-mstr:50070/.
$ hadoop fs -setrep -R 2 /user/fedora/
These experiments illustrate the resilience and flexibility of the Hadoop distributed file system, and the implicit optimisation to minimise the transfer of data across the network. You don’t need to worry about distribution of data.
During your experiments, in case you come across the error stating that the namenode is in safe mode, run the following command:
$ sudo runuser hdfs -s /bin/bash /bin/bash -c "hdfs dfsadmin -safemode leave"
Loading data programmatically
It is likely that you will need to get the data from some existing sources like files and databases and put what you need into HDFS. As the next experiment, you can take up the document files you may have collected over the years. Convert each into text and load them. Hadoop works best with large files and not with a collection of small files. So, all files will go into a single HDFS file, where each file will be a single record with the name of the file as the prefix. You don’t need to create a local file. The output of the program can be piped to the hdfs dfs -put command.
The following Python program load_doc_files.py illustrates the idea.
- It selects document files.
- Each document file is converted to a text file.
- The name of the file is output with a delimiter (a tab, in this case).
- Each line from the file is read and output with the new line character replaced by a delimiter.
- On reaching the end of the file, a new line character is written to the standard output.
#!/usr/bin/python from __future__ import print_function import sys import os import subprocess DELIM='\t' FILETYPES=['odt','doc','sxw','abw'] # use soffice to convert a document to text file def convert_to_text(inpath,infile): subprocess.call(['soffice','--headless','--convert-to','txt:Text', '--outdir','/tmp','/'.join([inpath,infile])]) return '/tmp/' + infile.rsplit('.',1)[0] + '.txt' # Convert & print file as a single line replacing '\n' by '\t' adding file name at the start def process_file(p,f): print("%s"%f, end=DELIM) textfile = convert_to_text(p,f) for line in open(textfile): print("%s"%line.strip(),end=DELIM) print() # Generator for files of type odt, doc, etc. def get_documents(path): for curr_path,dirs,files in os.walk(path): for f in files: try: if f.rsplit('.',1)[1].lower() in FILETYPES: yield curr_path,f except: pass # pass the root directory as a parameter for path,f in get_documents(sys.argv[1]): process_file(path,f)
You may run the above program as follows, from the desktop:
$ ./load_doc_files.py ~/Documents | HADOOP_USER_NAME=fedora \ hdfs dfs -fs hdfs://h-mstr/ -put doc_files.txt
The above program can be easily extended to convert and load PDF files as well. Or another program can be used to append to the existing HDFS file.
The obvious next step is that once the data is in Hadoop, you need to figure out how to use it and what you can do with it.
An example of modified word count
Hadoop Map-Reduce offers a very convenient streaming framework. It takes input lines from an HDFS file and passes them on standard input to one or more mapper programs. The standard output of the mapper programs is routed to the standard input of the reducer programs. The mapper and reducer programs could be in any language.
Each line of a mapper output is expected to be a keyword, value pair separated by a tab. All lines with the same keyword are routed to the same reducer.
A common first example is a word count in a text file. Each line of a text file is split into words by a mapper. It writes the word as the key with the value 1. The reducer counts the number of times a particular word is received. Since the word is the key, all occurrences of the same word will be routed to the same reducer.
In doc_files.txt, each line is actually a file. It is very likely that words will be repeated in a line. So, it is better if the mapper counts the words in a line before writing to stdout, as shown in the following mapper.py:
#!/usr/bin/python import sys import re # Generator for words in a line def read_input(file): for line in file: yield re.compile('\W*').split(line) def main(sep='\t'): for words in read_input(sys.stdin): wd_dict={} # count the number of occurences of a case-insensitive word for word in words: if word != '': wd = word.lower() wd_dict[wd] = wd_dict.get(wd,0) + 1 # print each word and its count for word in wd_dict: print('%s%s%d'%(word,sep,wd_dict[word])) if __name__ == "__main__" : main()
There is no need to change the reducer function as long as it had not assumed that the count of each word was 1. The corresponding reducer.py:
#!/usr/bin/python import sys # convert each line into a word and its count def read_mapper_output(file, sep): for line in file: yield line.strip().split(sep) # add the count for each occurrence of the word def main(sep='\t'): wc = {} for word, count in read_mapper_output(sys.stdin, sep): wc[word] = wc.get(word,0) + int(count) # print the sorted list for word in sorted(wc): print("%s%s%d"%(word,sep,wc[word])) if __name__ == "__main__": main()
So, having signed into fedora@h-mstr, you may run these commands to count the words and examine the result:
$ hadoop jar /usr/share/java/hadoop/hadoop-streaming.jar \ -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py\ -input document_files.txt -output wordcount.out $ hadoop fs -cat wordcount.out/part-00000 | less
As seen in the earlier experiments, you can store the data in a HDFS file using any programming language. Then, you can write fairly simple map and reduce programs in any programming language, without worrying about any issues related to distributed processing. Hadoop will make the effort to optimise the distribution of the data across the nodes as well as feed the data to the appropriate mapper programs.