In this post I will explain the Hadoop Streaming utility. Hadoop Streaming uses executables or scripts to create a MapReduce job and submits the job to a Hadoop cluster. Hadoop’s programming model is called MapReduce. In a previous post I have explained MapReduce using a Unix pipe which includes two Python scripts and a few Linux commands. In this post I will use the same Python scripts and Hadoop Streaming to run them as MapReduce job on a Hadoop cluster.
In hindsight the creation of this post was the most difficult so far. I am using the Hadoop Book as travel guide for my BigData Investigation. The book provides an example for running Ruby scripts on Hadoop, but I failed to run it on my Hadoop cluster. Meanwhile I understand that the Hadoop cluster of my setup was configured in Pseudo-Distributed Mode but the example requires a Hadoop cluster in Standalone (Local) Mode. I will explain the different Hadoop cluster modes in the next post.
I recommend that you read my introduction in MapReduce (BigData Investigation 4 – MapReduce Explained), before you continue to read this post. In the MapReduce Explained post I have analyzed the sample data and the Python scripts which are illustrated in the Hadoop Book and executed them in a Unix pipe. The example provides different scripts for map (max_temperature_map.py) and reduce (max_temperature_reduce.py). For your convenience I am copying here the output of the Unix pipe. See the MapReduce Explained post for details.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort | \ ch02-mr-intro/src/main/python/max_temperature_reduce.py 1949 111 1950 22 [cloudera@quickstart hadoop-book]$
For this post I use the Cloudera QuickStart VM. Login to the VM (user: cloudera, password: cloudera) and get the examples and the sample data of the Hadoop Book.
login as: cloudera cloudera@127.0.0.1's password: Last login: Mon Aug 29 13:12:04 2016 from 10.0.2.2 [cloudera@quickstart ~]$ git clone https://github.com/tomwhite/hadoop-book.git Initialized empty Git repository in /home/cloudera/hadoop-book/.git/ remote: Counting objects: 4931, done. remote: Total 4931 (delta 0), reused 0 (delta 0), pack-reused 4931 Receiving objects: 100% (4931/4931), 2.59 MiB | 59 KiB/s, done. Resolving deltas: 100% (1975/1975), done. [cloudera@quickstart ~]$ cd hadoop-book/ [cloudera@quickstart hadoop-book]$ ls -l ch02-mr-intro/src/main/python/ total 8 -rwxrwxr-x 1 cloudera cloudera 231 Aug 29 13:13 max_temperature_map.py -rwxrwxr-x 1 cloudera cloudera 374 Aug 29 13:13 max_temperature_reduce.py [cloudera@quickstart hadoop-book]$ ls -l input/ncdc/sample.txt -rw-rw-r-- 1 cloudera cloudera 529 Aug 29 13:13 input/ncdc/sample.txt [cloudera@quickstart hadoop-book]$
Hadoop Streaming supports any executable or script as mapper or reducer. Hadoop Streaming requires that the executables or scripts read the input from Standard Input (stdin) and write the results to Standard Output (stdout). Here is the documentation of Hadoop Streaming for Hadoop 2.7.2 (Hadoop 2.7.2 was the latest stable Hadoop release when I wrote this post). Hadoop Streaming supports a plenty of additional command options which we do not need for this post. See the documentation for details.
The documentation starts with the following example.
hadoop jar hadoop-streaming-2.7.2.jar \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /usr/bin/wc
We need to adjust the values for the options to run the example Python scripts with the sample data provided by the Hadoop Book on the Cloudera QuickStart VM. I found the correct syntax after several trials and error.
Hadoop Streaming is shipped as jar file and needs to be passed as argument to the hadoop CLI command. First we need to find the hadoop-streaming jar file.
[cloudera@quickstart hadoop-book]$ locate hadoop-streaming | grep jar /usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar /usr/jars/hadoop-streaming-2.6.0-mr1-cdh5.7.0.jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.7.0.jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.7.0.jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar /usr/lib/oozie/oozie-sharelib-mr1/lib/mapreduce-streaming/hadoop-streaming.jar /usr/lib/oozie/oozie-sharelib-yarn/lib/mapreduce-streaming/hadoop-streaming.jar [cloudera@quickstart hadoop-book]$ ls -l /usr/lib/hadoop-mapreduce/hadoop-streaming.jar lrwxrwxrwx 1 root root 35 Apr 5 23:49 /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -> hadoop-streaming-2.6.0-cdh5.7.0.jar [cloudera@quickstart hadoop-book]$
From my various failed attempts I know that we need to specify the full path for the mapper script and the reduce script. The following command brings us closer to the correct syntax, but we are not yet there. The issue is that Hadoop expects the input data in the HDFS file system while I have stored it on the local Linux file system. HDFS is the default distributed file system which is shipped with Hadoop. I will explain HDFS in a future post.
[cloudera@quickstart hadoop-book]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -input /home/cloudera/hadoop-book/input/ncdc/sample.txt \ -output output \ -mapper /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_map.py \ -reducer /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_reduce.py packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob305702993711 004322.jar tmpDir=null 16/08/29 13:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/08/29 13:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/08/29 13:44:09 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn /staging/cloudera/.staging/job_1472469537947_0002 16/08/29 13:44:09 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt 16/08/29 13:44:09 WARN security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt 16/08/29 13:44:09 ERROR streaming.StreamJob: Error Launching job : Input path does not exist: hdfs://quickstart.cloudera:8020/home/cloudera/hadoop-book/input/ncdc/sample.txt Streaming Command Failed! [cloudera@quickstart hadoop-book]$
For execution on a multi-node Hadoop cluster we would need to copy the sample data into HDFS, but for our purposes we can tweak to URI of the input data to tell the hadoop CLI command to read the input data from the local Linux file system. I did the same trick to write the output to the local Linux file system and not to HDFS. This command completes successfully. The output illustrates that Hadoop Streaming monitors the progress of job execution and reports regular status updates.
[cloudera@quickstart hadoop-book]$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -input file:/home/cloudera/hadoop-book/input/ncdc/sample.txt \ -output file:/tmp/output_storageulf \ -mapper /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_map.py \ -reducer /home/cloudera/hadoop-book/ch02-mr-intro/src/main/python/max_temperature_reduce.py packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob3758230746893225 679.jar tmpDir=null 16/08/29 14:26:48 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/08/29 14:26:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/08/29 14:26:49 INFO mapred.FileInputFormat: Total input paths to process : 1 16/08/29 14:26:49 INFO mapreduce.JobSubmitter: number of splits:2 16/08/29 14:26:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1472469537947_0005 16/08/29 14:26:50 INFO impl.YarnClientImpl: Submitted application application_1472469537947_0005 16/08/29 14:26:50 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/ proxy/application_1472469537947_0005/ 16/08/29 14:26:50 INFO mapreduce.Job: Running job: job_1472469537947_0005 16/08/29 14:27:00 INFO mapreduce.Job: Job job_1472469537947_0005 running in uber mode : false 16/08/29 14:27:00 INFO mapreduce.Job: map 0% reduce 0% 16/08/29 14:27:13 INFO mapreduce.Job: map 50% reduce 0% 16/08/29 14:27:14 INFO mapreduce.Job: map 100% reduce 0% 16/08/29 14:27:21 INFO mapreduce.Job: map 100% reduce 100% 16/08/29 14:27:21 INFO mapreduce.Job: Job job_1472469537947_0005 completed successfully 16/08/29 14:27:21 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=865 FILE: Number of bytes written=347468 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=210 HDFS: Number of bytes written=0 HDFS: Number of read operations=2 HDFS: Number of large read operations=0 HDFS: Number of write operations=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Rack-local map tasks=2 Total time spent by all maps in occupied slots (ms)=22524 Total time spent by all reduces in occupied slots (ms)=5034 Total time spent by all map tasks (ms)=22524 Total time spent by all reduce tasks (ms)=5034 Total vcore-seconds taken by all map tasks=22524 Total vcore-seconds taken by all reduce tasks=5034 Total megabyte-seconds taken by all map tasks=23064576 Total megabyte-seconds taken by all reduce tasks=5154816 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=55 Map output materialized bytes=77 Input split bytes=210 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=77 Reduce input records=5 Reduce output records=2 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=351 CPU time spent (ms)=1450 Physical memory (bytes) snapshot=558338048 Virtual memory (bytes) snapshot=4510806016 Total committed heap usage (bytes)=391979008 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=794 File Output Format Counters Bytes Written=29 16/08/29 14:27:21 INFO streaming.StreamJob: Output directory: file:/tmp/output_storageulf [cloudera@quickstart hadoop-book]$
The output of the reduce function was written to /tmp/output_storageulf/part-00000. It is the same as the output of the Unix pipe which I have pasted at the beginning of this post.
[cloudera@quickstart hadoop-book]$ ls -l /tmp/output_storageulf/ total 4 -rw-r--r-- 1 yarn yarn 17 Aug 29 14:27 part-00000 -rw-r--r-- 1 yarn yarn 0 Aug 29 14:27 _SUCCESS [cloudera@quickstart hadoop-book]$ cat /tmp/output_storageulf/part-00000 1949 111 1950 22 [cloudera@quickstart hadoop-book]$
Please note that all output files and directories are owned by yarn:yarn. YARN is the scheduler of Hadoop. That ownership of the output directory and it files by yarn:yarn indicates that the Hadoop Streaming utility indeed created a MapReduce job which was submitted to Hadoop cluster for execution. I will explain YARN in a future post.
Ulf’s Conclusion
The Hadoop Streaming utility is easy to use. It allows to use executables or any scripting language to create MapReduce applications. In contrast to a Unix pipe, Hadoop streaming runs the executables or scripts on many nodes in parallel and monitors the progress until it completes.
In the next post I will explain the Hadoop Cluster modes.
Changes:
2016/09/23 – added link – “Hadoop Cluster modes” => BigData Investigation 6 – Hadoop Cluster Modes