A Typical Large Data Problem

Any project that deals with big data can usually be broken down into the following steps:

  1. Iterate over a large number of records

  2. Extract something of interest from each (map)

  3. Shuffle and sort intermediate results

  4. Aggregate intermediate results (reduce)

  5. Generate final output

The key idea is to provide a functional abstraction for these tasks and allow the programmer to focus on writing just the map and reduce programs.

Anatomy of a Hadoop Job

A MapReduce program in Hadoop is called a Hadoop job.

  • Jobs are divided into map and reduce tasks

  • An instance of the running a task is called a task attempt

  • Multiple jobs can be composed into a workflow

The job submission process is as follows:

  • Client (i.e., driver program) creates a job, configures it, and submits it to the JobTracker

  • JobClient computes input splits (on client end)

  • Job data (jar, configuration XML) are sent to JobTracker

  • JobTracker puts job data in a shared location, enqueues tasks

  • TaskTrackers poll for tasks

The above image depicts the entire workflow of a job in Hadoop.

  • InputSplits: describe how to read an input file and convert it into usable records. It is especially helpful in the case of JSON files. JSON files have multi-line records enclosed within curly braces { }. We know that HDFS will split the input file into multiple blocks before the InputSplit can receive it. It might happen that the InputSplit receives a block with an incomplete record (i.e. has '{' but no '}'). In such a case, the InputSplit can request the next block from the appropriate InputSplit that has the required block. Note that the InputSplit presents a byte-oriented view of the data. This data is sent to the RecordReaders.

  • RecordReaders: are responsible for converting the output of the InputSplit into a record-oriented view. These records are then sent to the mappers.

  • Mappers: As discussed earlier, the mapper generates tuples ((key, value) pairs) that are then sent to the partitioners. Note that Hadoop can decide the number of mappers required based on the number of file splits/chunks/blocks; basically one map task is spawned per InputSplit.

  • Partitioners: decide which keys are to be processed by which reducers. They do so using a hash function. HashPartitioner is the default partitioner.

  • Reducers: perform aggregation to reduce the number of tuples. Note that the number of reducers is to be specified by the MapReduce programmer using Job.setNumReduceTasks(int) (default=1). We can also have 0 reducers, depending on the job to be performed. The optimal number of reducers, however, ranges between 0.95 and 1.75 times the number of nodes. With 0.95, all of the reducers can launch immediately and start transferring mapper outputs as the maps finish. With 1.75, the faster nodes will finish their first round of reduces and launch a second wave of reducers doing a much better job of load balancing.

  • RecordWriters: write records to the output file.

Note: If we need to write out own InputSplit, we must implement the InputSplit interface. FileSplit is the default InputSplit.

Shuffle and Sort in Hadoop

tl;dr

Shuffling refers to the fetching of relevant mapper outputs and sorting is the grouping of mapper outputs by key, to prepare the input for the reducers. The shuffle and sort phases occur simultaneously i.e. while mapper outputs are being fetched, they are merged.

Detailed Explanation:

Shuffling is the process by which intermediate output from mappers is transferred to the reducers. It occurs as follows:

  • Mapper outputs are first transferred to an in-memory buffer

  • When the buffer reaches a threshold, its contents are spilled to the disk

  • Spills are then merged in a single partitioned file (sorted within each partition) and sent to the reducers

Sorting is a multi-pass merge of mapper outputs. It occurs as follows:

  • First, mapper outputs are copied to the reducer machines

  • Then, merging of mapper outputs is performed (based on the keys)

  • Then, every reducer obtains all values associated with the same key

Sorting therefore helps the reducers to easily distinguish when a new reduce task should start. This saves time for the reducers. Reducers simply start a new reduce task when the next key in the sorted input data is different from the previous key.

The diagram below depicts the shuffling and sorting phases in a MapReduce job.

It is important to note that shuffling and sorting will not be performed at all if we specify zero reducers. In such a case, the MapReduce job stops at the map phase itself.

Another diagram that depicts shuffle and sort operations:

Hadoop Workflow

As shown in the image:

  1. We must first load data into HDFS

  2. Then, we must develop our code locally

  3. We must then submit our code (containing map and reduce instructions) to HDFS (this code is referred to as the MapReduce job in the above image)

  4. Step 2 can be repeated iteratively to fix errors, if any. Finally, we must retrieve the results from HDFS

Tools for Synchronization

As we know, in a distributed computing environment such as HDFS, synchronization is of utmost importance. A few things to make use of to ensure synchronization include:

  • Cleverly-constructed data structures

  • Sort order of intermediate keys

  • Using a partitioner that decides which keys will be processed by which reducer

  • Preserving state in mappers and reducers (i.e. capturing dependencies across multiple keys and values)

The following diagram shows how to preserve state in mappers and reducers:

The Issue with Garbage Collection

Garbage Collection is a process by which the JVM (Java Virtual Machine) gets rid of unwanted objects.

Garbage Collection is a time consuming task, and it occurs in the NameNode of HDFS. When the JVM performs Garbage Collection, EVERYTHING STOPS. The entire cluster comes to a grinding halt until the JVM is done with the Garbage Collection task. Recently, Twitter had one of its clusters stop for 22 hours to perform Garbage Collection!

So, we must avoid the creation of objects. This way, the JVM will not have to get rid of unreachable/unwanted objects and Garbage Collection can be avoided.

The next section gives a brief overview of the Google File System and how HDFS differs from it.

Last updated