Programming in Hadoop

Basic Hadoop API

The following code is the boilerplate for most programs in Hadoop:

// Mapper
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
void configure(JobConf job)
void close() throws IOException

// Reducer/Combiner
void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter)
void configure(JobConf job)
void close() throws IOException

// Partitioner
void getPartition(K2 key, V2 value, int numPartitions)

Programmers mainly specify two functions:

  • map (k, v) → <k’, v’>*

  • reduce (k’, v’) → <k’, v’>*

All values with the same key are sent to the same reducer.

Programmers may also specify:

  • partition (k’, number of partitions) → partition for k’

This is often a simple hash of the key, Ex. hash(k’) mod n. The aim is to divide up key space for parallel reduce operations i.e. partitioners control which reducers process which keys. All values with the same key are sent to the same reducer. This ensures that the outputs of all the reducers will be disjoint/unique, i.e. there will be no common/overlapping tuples.

  • combine (k’, v’) → <k’, v’>*

Combiners are mini-reducers that run in-memory immediately after the map phase. They reside on the same machines as the mappers. They are used for local aggregation of intermediate results before the tuples are sent to the reducers. The aim of using combiners is reduce network traffic and minimize the load on the reducers. Note that the combiners have the exact same code as the reducers, so they perform the exact same operation as the reducers, but locally.

Another component, known as the reporter, allows us to report the status to Hadoop.

The execution framework handles everything else, apart from the above operations, including:

  • Scheduling: assigns workers to map and reduce tasks

  • Data distribution: moves processes to data

  • Synchronization: gathers, sorts, and shuffles intermediate data

  • Errors and faults: detects worker failures and restarts

We don’t know:

  • Where mappers and reducers run

  • When a mapper or reducer begins or finishes

  • Which input a particular mapper is processing

  • Which intermediate key a particular reducer is processing

Note: Hadoop uses the class definition to create objects. So, while submitting a job to Hadoop, we must provide class definitions for the mapper, reducer, combiner etc to Hadoop so that Hadoop can create as many instances (objects) of those classes at runtime, as and when required.

The Need for Combiners

The following image depicts a MapReduce job without combiners:

Consider a sample WordCount job. Every mapper outputs a Text and an IntWriteable. Each reducer gets one Text input and a list of IntWriteables. Data that is generated by the mappers has to be written to the disk before sending it to the reducers. The reducers have to aggregate all the tuples coming from the mappers.

The following image depicts a MapReduce job with combiners:

Combiners reside on the same machine as the mappers. So, data doesn't have to be written to the disk while being moved between the mappers and the combiners. Combiners run the same code that the reducers run, but they run this code locally i.e. on the data that is generated by their corresponding mappers. This is called local aggregation. This not only speeds up processing, but also reduces the workload on the reducers, as the reducers will now receive locally aggregated tuples.

Sample MapReduce Boilerplate Code for WordCount

WordCount using Hadoop is analogous to HelloWorld programs in other languages. The aim is to count the number of occurrences of each word in a text file, using distributed computing. The following is the pseudo-code for the map and reduce functions of the WordCount program:

Map(String docid, String text):
     for each word w in text:
          Emit(w, 1);

Reduce(String term, Iterator<Int> values):
     int sum = 0;
     for each v in values:
          sum += v;
          Emit(term, value);

For each word in the document, mappers will output tuples of the form (word, 1).

Each reducer receives an array of 1s for each word, in the form (word, [1, 1, 1,...]).

The number of occurrences of each word is calculated by counting the number of 1s for each word.

Note that if combiners were used before the tuples went to the reducers, the array will not contain only 1s, since local aggregation would have taken place. In such a case, we can't simply count the number of elements in the array (as in the previous case) to get the word count. Instead, we will need to sum up all the numbers in the array to get the count for the word.

Debugging in Hadoop

Debugging refers to the process of removing bugs from our code.

It is important to know how to debug a program across a distributed file system like HDFS, since we can't possibly use the same step-by-step debugging technique that we would use while debugging code on a single machine, across a cluster. We cannot even use a print statement while debugging in HDFS.

In HDFS, debugging is done using counters and log files. It is important to remember to debug using only a small subset of our data, instead of debugging using the entire data (remember, we are dealing with big data!). Debugging using a small subset of the data will save time and computational resources.

Note: The Tool interface can be used to single-step debug a MapReduce program on our local machine.

A Simple Overview of a MapReduce Job

  1. Configure the Job: Specify Input, Output, Mapper, Reducer and Combiner

  2. Implement the Mapper: For example, to count the number of occurrences of words in a line, tokenize the text and emit the words with a count of 1 i.e. <word, 1>

  3. Implement the Reducer: Sum up counts for each word and write the result to HDFS

  4. Run the Job

The entire workflow of a MapReduce job exclusively uses <k, v> pairs and can be denoted as follows:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

1. Configure the Job

  • The Job class encapsulates information about the job and handles the execution of the job.

  • A job is packaged within a jar file and this file is distributed among nodes by the Hadoop framework. We need to specify our jar file to Hadoop using the Job.setJarByClass() function.

  • Specify the Input: Input is specified by implementing InputFormat, for example TextInputFormat. The input can be a file/directory/file pattern. InputFormat is responsible for creating splits (InputSplitters) and a RecordReader. It also controls the input types of the (key, value) pairs. Mappers receive input one line at a time.

TextInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
  • Specify the Output: Output is specified by implementing OutputFormat, for example TextOutputFormat. It basically defines the specification for the output returned by the MapReduce job. We must define the output folder. However, the MapReduce program will not work if the output folder exists in advance. By default, a MapReduce job uses a single reducer. However, if we use multiple reducers, there will be multiple output files, one per reducer, and we must manually concatenate these files to get the expected output of the MapReduce job.

TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputFormatClass(TextOutputFormat.class);

We must also set the output types for the (key, value) pairs for both mappers and reducers:

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

Usually we use the same output types for both mappers and reducers, but if we need to set different types, we can use setMapOutputKeyClass(), setMapOutputValueClass(), etc.

2. Implement the Mapper

  • The Mapper class has 4 parameters: input key, input value, output key, output value

  • It makes use of Hadoop's IO framework for input/output operations

  • We must define the map() function that takes some input (key, value) pair and outputs another (key, value) pair, depending on the problem at hand

3. Implement the Reducer

  • The Reducer class also has 4 parameters: input key, input value, output key, output value

  • The (key, value) pairs generated by the mappers are grouped by key, sorted and sent to the reducers; each reducer receives all values corresponding to a certain key

  • We must implement the reduce() function that takes some input (key, <set of values>) pair and outputs (key, value) pairs

  • The output types of the map function must match the input type of the reduce function

We can choose to use combiners immediately after the mappers, provided that their output type matches that of the mappers. This will reduce the number of tuples that will be sent to the reducers, due to local aggregation by the combiners.

4. Run the Job

In this stage, we run the MapReduce job and the output(s) get(s) saved in the output folder.

The image below depicts the working of a MapReduce job in a simplified block diagram:

MapReduce Streaming

Using MapReduce Streaming, we can use languages other than Java for out MapReduce programs, as long as the language supports standard input and output. The limitation of MapReduce Streaming is that the Input and Output keys must always be Text. No other type is supported for the keys.

Last updated