life cycle of a mapreduce program – job submission,job initialization, task assignment, task execution, progress updates and job completion

You can run a mapreduce job with a single method call submit() on a Job object or you can also call waitForCompletion(), which submits the job if it hasn’t been submitted already, then waits for it to finish.

The below is the structure at high level

1. The client, which submits the MapReduce job.

2. The YARN resource manager, which coordinates the allocation of compute resources on the cluster.

3. The YARN node managers, which launch and monitor the compute containers on machines in the cluster.

4. The MapReduce application master, which coordinates the tasks running the Map-Reduce job. The application master and the MapReduce tasks run in containers that are scheduled by the resource manager and managed by the node managers.

5. The distributed filesystem , which is used for sharing job files between the other entities.

Job Submission

The submit() method on Job creates an internal JobSubmitter instance and calls submitJobInternal() on it Having submitted the job, waitFor Completion() polls the job’s progress once per second and reports the progress to the console if it has changed since the last report. When the job completes successfully, the job counters are displayed. Otherwise, the error that caused the job to fail is logged to the console.

The job submission process implemented by JobSubmitter does the following

1. Asks the resource manager for a new application ID, used for the MapReduce job ID.

2. Checks the output specification of the job. For example, if the output directory has not been specified or it already exists, the job is not submitted and an error is thrown to the MapReduce program.

3. Computes the input splits for the job. If the splits cannot be computed (because the input paths don’t exist, for example), the job is not submitted and an error is thrown to the MapReduce program.

4. Copies the resources needed to run the job, including the job JAR file, the configuration file, and the computed input splits, to the shared filesystem in a directory named after the job ID. The job JAR is copied with a high replication factor controlled by the mapreduce.client.submit.file.replication property, which defaults to 10 so that there are lots of copies across the cluster for the node managers to access when they run tasks for the job.

The client running the job calculates the splits for the job by calling getSplits() on the inputformat class, then sends them to the application master, which uses their storage locations to schedule map tasks that will process them on the cluster. The map task passes the split to the createRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function.

5. Submits the job by calling submitApplication() on the resource manager

Job Initialization

1. Resource manager receives a call to its submitApplication() it hands off the request to the YARN scheduler.

2. The scheduler allocates a container, and the resource manager then launches the application master’s process there, under the
node manager’s management.

3. The application master for MapReduce jobs is a Java application whose main class is MRAppMaster. It initializes the job by creating a number of bookkeeping objects to keep track of the job’s progress, as it will receive progress and completion reports from the
tasks.

4. Next, it retrieves the input splits computed in the client from the shared filesystem. It then creates a map task object for each split, as well as a number of reduce task objects determined by the mapreduce.job.reduces property which is set by the setNumReduceTasks() method on Job. Tasks are given IDs at this point.

5. The application master must decide how to run the tasks that make up the MapReduce job. If the job is small, the application master may choose to run the tasks in the same JVM as itself. This happens when it judges that the overhead of allocating and running tasks in new containers outweighs the gain to be had in running them in parallel, compared to running them sequentially on one node. Such a job is said to be uberized, or run as an uber task.

6. Finally, before any tasks can be run, the application master calls the setupJob() method on the OutputCommitter. For FileOutputCommitter, which is the default, it will create the final output directory for the job and the temporary working space for the task output.

Note : By default, a small job is one that has less than 10 mappers,only one reducer, and an input size that is less than the size of one HDFS block. And these values may be changed for a job by mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, and map
reduce.job.ubertask.maxbytes. Uber tasks must be enabled explicitly for an individual job, or across the cluster by setting mapreduce.job.ubertask.enable to true.

Task Assignment

1. If the job does not qualify for running as an uber task, then the application master requests containers for all the map and reduce tasks in the job from the resource manager. Requests for map tasks are made first and with a higher priority than those for reduce tasks, since all the map tasks must complete before the sort phase of the reduce can start. Requests for reduce tasks are not made until 5% of map tasks have completed.

2. Reduce tasks can run anywhere in the cluster, but requests for map tasks have data locality constraints that the scheduler tries to honor
In the optimal case, the task is data local—that is, running on the same node that the split resides on. Alternatively, the task may be rack local on the same rack, but not the same node, as the split. Some tasks are neither data local nor rack local and retrieve their data from a different rack than the one they are running on. For a particular job run, you can determine the number of tasks that ran at each locality level by looking at the job’s counters which is DATA_LOCAL_MAPS.

3. Requests also specify memory requirements and CPUs for tasks. By default, each map and reduce task is allocated 1,024 MB of memory and one virtual core. The values are configurable on a per-job basis via the following properties mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores and mapreduce.reduce.cpu.vcores.

Task Execution

1. Once a task has been assigned resources for a container on a particular node by the resource manager’s scheduler, the application master starts the container by contacting the node manager.

2. The task is executed by a Java application whose main class is YarnChild. Before it can run the task, it localizes the resources that the
task needs, including the job configuration and JAR file, and any files from the distributed cache.

3. Finally, it runs the map or reduce task.

Note : The YarnChild runs in a dedicated JVM, so that any bugs in the user-defined map and reduce functions or even in YarnChild don’t affect the node manager by causing it to crash or hang.Each task can perform setup and commit actions, which are run in the same JVM as the task itself and are determined by the OutputCommitter for the job . For file-based jobs, the commit action moves the task output from a temporary location to its final location. The commit protocol ensures that when speculative execution is enabled , only one of the duplicate tasks is committed and the other is aborted.

Progress and Status Updates

When a task is running, it keeps track of its progress that is the proportion of the task completed. For map tasks, this is the proportion of the input that has been processed. For reduce tasks, it’s a little more complex, but the system can still estimate the proportion of the reduce input processed. It does this by dividing the total progress into three parts, corresponding to the three phases of the shuffle.

Progress reporting is important, as Hadoop will not fail a task that’s making progress. All of the following operations constitute progress

1. Reading an input record in a mapper or reducer.

2. Writing an output record in a mapper or reducer.

3. Setting the status description via Reporter’s or TaskAttemptContext’s setStatus() method.

4. Incrementing a counter using Reporter’s incrCounter() method or Counter’s increment() method.

5. Calling Reporter’s or TaskAttemptContext’s progress() method.

Note : As the map or reduce task runs, the child process communicates with its parent application master through the umbilical interface. The task reports its progress and status including counters back to its application master, which has an aggregate view of the job, every three seconds over the umbilical interface.

Job Completion

When the application master receives a notification that the last task for a job is complete, it changes the status for the job to successful. Then, when the Job polls for status, it learns that the job has completed successfully, so it prints a message to tell the user and then returns from the waitForCompletion() method. Job statistics and counters are printed to the console at this point.

Finally, on job completion, the application master and the task containers clean up their working state so intermediate output is deleted, and the OutputCommitter’s commit Job() method is called. Job information is archived by the job history server to enable later interrogation by users if desired.