In this post I will explain MapReduce. MapReduce is Hadoop’s programming model to analyze data. I use the Hadoop Book for my investigation on BigData. MapReduce is covered in chapter 2. Let’s study the examples to understand MapReduce.
All code examples of the Hadoop Book are available at GitHub. First we need to copy the example data and code from GitHub to a Linux server. I executed the examples on the Cloudera Quickstart VM which I have installed earlier. Though in a hindsight I have learned that any Linux server will work for this post. Hadoop is not yet required.
login as: cloudera cloudera@127.0.0.1's password: Last login: Sat Aug 13 11:42:17 2016 from 10.0.2.2 [cloudera@quickstart ~]$ git clone https://github.com/tomwhite/hadoop-book.git Initialized empty Git repository in /home/cloudera/hadoop-book/.git/ remote: Counting objects: 4931, done. remote: Total 4931 (delta 0), reused 0 (delta 0), pack-reused 4931 Receiving objects: 100% (4931/4931), 2.59 MiB | 462 KiB/s, done. Resolving deltas: 100% (1975/1975), done. [cloudera@quickstart ~]$ cd hadoop-book/ [cloudera@quickstart hadoop-book]$ ls appc ch06-mr-dev ch14-flume ch20-hbase hadoop-meta book ch08-mr-types ch15-sqoop ch21-zk input ch02-mr-intro ch09-mr-features ch16-pig ch22-case-studies pom.xml ch03-hdfs ch10-setup ch17-hive common README.md ch04-yarn ch12-avro ch18-crunch conf snippet ch05-io ch13-parquet ch19-spark hadoop-examples [cloudera@quickstart hadoop-book]
MapReduce programs can be written in Java or in any programming language which support Standard Streams, e.g. read from standard input (stdin) and write to standard output (stdout). The book provides examples for Java, Ruby and Python to process a text file with five lines of data. Let’s look into the text file first.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999 [cloudera@quickstart hadoop-book]$
The lines represent weather data which is difficult to read for humans. MapReduce allows to quickly analyze billions of such lines on a Hadoop cluster to gain insight. Each of the above lines includes a year and a measured temperature. For instance, the third line tells that in the year 1950 a temperature of -1.1 °C was measured.
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
Each MapReduce application consists of three parts:
- The Map function is an application specific function which extracts the information of each line.
- The Reduce function is an application specific function which consolidates and integrates the extracted information of all lines.
- The Hadoop framework is a generic framework used by all applications which ensures parallel execution on hundreds or thousands of nodes.
The Hadoop Book provides example code for Java, Ruby and Python where the Map function extracts the year and the temperature of each line and and the Reduce function determines each year’s maximum temperature. I will start with the Python example, given that I do not know Ruby and have limited Java skills.
The book illustrates Python programs for map (max_temperature_map.py) and reduce (max_temperature_reduce.py) which in total comprise less than 20 lines of code. Follow the links in the previous sentence to look at the code. The interesting thing is that the Python scripts can be executed without Hadoop using Unix pipes. The temperature is scaled by a factor of ten, so that the maximum of year 1949 was 11.1°C and the maximum of 1950 was 2.2°C.
[cloudera@quickstart hadoop-book]$ ls -l ch02-mr-intro/src/main/python/ total 8 -rwxrwxr-x 1 cloudera cloudera 231 Aug 14 03:20 max_temperature_map.py -rwxrwxr-x 1 cloudera cloudera 374 Aug 14 03:20 max_temperature_reduce.py [cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort | \ ch02-mr-intro/src/main/python/max_temperature_reduce.py 1949 111 1950 22 [cloudera@quickstart hadoop-book]
Let’s see, what each step of the Unix pipe does. The Unix cat command prints the content of the file with the sample data. We already used it at the beginning of this post, but for your convenience I paste the output here again.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999 0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999 0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999 0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999 [cloudera@quickstart hadoop-book]
The Map function (max_temperature_map.py) extracts the year and the temperature of each line and prints them in a single line. The tuple of year and temperature is also referred to as key-value pair.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078 [cloudera@quickstart hadoop-book]$
Hadoop sorts all key-value pairs before it passes them to the Reduce function. The Unix sort command does the same for our Unix pipe.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort 1949 +0078 1949 +0111 1950 +0000 1950 -0011 1950 +0022 [cloudera@quickstart hadoop-book]$
Finally the Reduce function (max_temperature_reduce.py) picks each year’s maximum temperature and prints it.
[cloudera@quickstart hadoop-book]$ cat input/ncdc/sample.txt | \ ch02-mr-intro/src/main/python/max_temperature_map.py | \ sort | \ ch02-mr-intro/src/main/python/max_temperature_reduce.py 1949 111 1950 22 [cloudera@quickstart hadoop-book]$
The sort is not really needed for this example which processes five lines only. Though the algorithm of the max_temperature_reduce.py script assumes that the keys of the input stream are sorted. I suspect that this pattern improves the performance of the Reduce function, if millions of key-value pairs need to be reduced.
Our Unix pipe creates a single instance of the Map function (max_temperature_map.py) and a single instance of the Reduce function (max_temperature_reduce.py). This is OK for development and education purposes where only a few lines of data are analyzed. In contrast Hadoop spawns hundreds or thousands instances to analyze huge amount of data on many nodes in parallel.
Ulf’s Conclusion
MapReduce is a simple programming model for data analysis. The use of programming languages like Java or Python allows to analyze a broad range of unstructured data including measured data, log files, web pages, images and videos. The Hadoop framework takes care that respective analysis jobs will be executed on hundreds or thousands of nodes in parallel to analyze huge amounts of data quickly.
In the next post I will explain how to run the Python scripts on a Hadoop cluster.
Changes:
2016/09/16 – added link – “how to run the Python scripts on a Hadoop cluster” => BigData Investigation 5 – MapReduce with Python and Hadoop Streaming