hadoop yarn concept with fifo,capacity and fair scheduling

Apache YARN (Yet Another Resource Negotiator) is Hadoop’s cluster resource management system. YARN was introduced in Hadoop 2 to improve the MapReduce implementation, but it is general enough to support other distributed computing paradigms as well. YARN provides APIs for requesting and working with cluster resources.

YARN provides its core services via two types of long-running daemon a resource manager (one per cluster) to manage the use of resources across the cluster, and node managers running on all the nodes in the cluster to launch and monitor containers.

Below are the steps to run an application on YARN

1. A client contacts the resource manager and asks it to run an application master process.

2. The resource manager then finds a node manager that can launch the application master in a container.

3. Precisely what the application master does once it is running depends on the application. It could simply run a computation in the container it is running in and return the result to the client. Or it could request more containers from the resource managers and use them to run a distributed computation .The latter is what the MapReduce YARN application does.

YARN has a flexible model for making resource requests. A request for a set of containers can express the amount of computer resources required for each container (memory nd CPU), as well as locality constraints for the containers in that request.

Locality is critical in ensuring that distributed data processing algorithms use the cluster bandwidth efficiently,2 so YARN allows an application to specify locality constraints for the containers it is requesting. Locality constraints can be used to request a container
on a specific node or rack, or anywhere on the cluster.

Sometimes the locality constraint cannot be met, in which case either no allocation is made or, optionally, the constraint can be loosened to start a container on a node in the same rack or if that’s not possible, on any node in the cluster. Locality is critical in ensuring that distributed data processing algorithms use the cluster bandwidth efficiently.

A yarn application can make resource requests at any time while it is running. For example, an application can make all of its requests up front, or it can take a more dynamic approach whereby it requests more resources dynamically to meet the changing needs of the application.Spark takes the first approach, starting a fixed number of executors on the cluster where as MapReduce, on the other hand, has two phases the map task containers are requested up front, but the reduce task containers are not started until later. Also, if any tasks fail, additional containers will be requested so the failed tasks can be rerun.

In MapReduce 1, there are two types of daemon that control the job execution process a jobtracker and one or more tasktrackers. The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a task fails, the jobtracker can reschedule it on a different tasktracker.

In MapReduce 1, the jobtracker takes care of both job scheduling (matching tasks with tasktrackers) and task progress monitoring (keeping track of tasks, restarting failed or slow tasks, and doing task bookkeeping, such as maintaining counter totals). By contrast, in YARN these responsibilities are handled by separate entities: the resource manager and an application master (one for each MapReduce job). The jobtracker is also responsible for storing job history for completed jobs, although it is possible to run a job history server as a separate daemon to take the load off the jobtracker. In YARN, the equivalent role is the timeline server, which stores application history.

The Yarn equivalent of Jobtracker is a combination of Resource manager, application master, timeline server and yarn equivalent of Tasktracker is Node manager and an yarn equivalent of slot is Container.

YARN was designed to address many of the limitations in MapReduce 1.

The benefits to using YARN

Scalability – YARN can run on larger clusters than MapReduce 1. MapReduce 1 hits scalability bottlenecks in the region of 4,000 nodes and 40,000 tasks,stemming from the fact that the jobtracker has to manage both jobs and tasks. YARN overcomes these limitations by virtue of its split resource manager/application master architecture it is designed to scale up to 10,000 nodes and 100,000 tasks.In contrast to the jobtracker, each instance of an application—here, a MapReduce job—has a dedicated application master, which runs for the duration of the application.

Availability – The large amount of rapidly changing complex state in the jobtracker’s memory (each task status is updated every few seconds, for example) makes it very difficult to retrofit HA into the jobtracker service.With the jobtracker’s responsibilities split between the resource manager and application master in YARN, making the service highly available became a divide and conquer problem provide HA for the resource manager, then for YARN applications.

In MapReduce 1, each tasktracker is configured with a static allocation of fixed-size slots, which are divided into map slots and reduce slots at configuration time. A map slot can only be used to run a map task, and a reduce slot can only be used for a reduce task.
In YARN, a node manager manages a pool of resources, rather than a fixed number of designated slots. MapReduce running on YARN will not hit the situation where a reduce task has to wait because only map slots are available on the cluster, which can happen in MapReduce 1. If the resources to run the task are available, then the application will be eligible for them.

Multitenancy – In some ways, the biggest benefit of YARN is that it opens up Hadoop to other types of distributed application beyond MapReduce. MapReduce is just one YARN application among many.

Scheduling in Yarn

Below are the schedulers available in yarn

1. FIFO – Yarn places applications in a queue and runs them in the order of submission (first in, first out). Requests for the first application in the queue are allocated first and once its requests have been satisfied, the next application in the queue is served, and so on. This scheduler is simple to understand and not needing any configuration, but it’s not suitable for shared clusters. Large applications will use all the resources in a cluster, so each application has to wait its turn

2. Capacity Scheduler – Here a separate dedicated queue allows the small job to start as soon as it is submitted, although this is at the cost of overall cluster utilization since the queue capacity is reserved for jobs in that queue. This means that the large job finishes later than when using the FIFO Scheduler. The Capacity Scheduler allows sharing of a Hadoop cluster along organizational lines, whereby each organization is allocated a certain capacity of the overall cluster. Each organization is set up with a dedicated queue that is configured to use a given fraction of the cluster capacity.

Queues may be further divided in hierarchical fashion, allowing each organization to share its cluster allowance between different groups of users within the organization. Within a queue, applications are scheduled using FIFO scheduling.If there are idle resources available, then the Capacity Scheduler may allocate the spare resources to jobs in the queue, even if that causes the queue’s capacity to be exceeded.This behavior is known as queue elasticity.

 

Note : In normal operation, the Capacity Scheduler does not preempt containers by forcibly killing them, so if a queue is under capacity due to lack of demand, and then demand increases, the queue will only return to capacity as resources are released from other queues as containers complete. It is possible to mitigate this by configuring queues with a maximum capacity so that they don’t eat into other queues capacities too much. This is at the cost of queue elasticity, of course, so a reasonable trade-off should be found by trial and error.

 

3. Fair Scheduler – Here there is no need to reserve a set amount of capacity, since it will dynamically balance resources between all running jobs. Just after the first (large) job starts, it is the only job running, so it gets all the resources in the cluster. When the second (small) job starts, it is allocated half of the cluster resources so that each job is using its fair share of resources. Here Scheduler attempts to allocate resources so that all running applications get the same share of resources.

To understand how resources are shared between queues, imagine two users A and B, each with their own queue. A starts a job, and it is allocated all the resources available since there is no demand from B. Then B starts a job while A’s job is still running, and after a while each job is using half of the resources . Now if B starts a second job while the other jobs are still running, it will share its resources with B’s other job, so each of B’s jobs will have one-fourth of the resources, while A’s will continue to have half. The result is that resources are shared fairly between users.

Note : On a shared cluster it is better to use the Capacity Scheduler or the Fair Scheduler. Both of these allow long running jobs to complete in a timely manner, while still allowing users who are running concurrent smaller ad hoc queries to get results back in a reasonable time.

 

Queue placement

The way that you specify which queue an application is placed in is specific to the application. For example, in MapReduce, you set the property mapreduce.job.queue name to the name of the queue you want to use.

Delay Scheduling

All the YARN schedulers try to honor locality requests. On a busy cluster, if an application requests a particular node, there is a good chance that other containers are running on it at the time of the request. The obvious course of action is to immediately loosen the locality requirement and allocate a container on the same rack. However, it has been observed in practice that waiting a short time which is no more than a few seconds can dramatically increase the chances of being allocated a container on the requested node, and therefore increase the efficiency of the cluster. This feature is called delay scheduling, and it is supported by both the Capacity Scheduler and the Fair Scheduler.

Every node manager in a YARN cluster periodically sends a heartbeat request to the resource manager—by default, one per second. Heartbeats carry information about the node manager’s running containers and the resources available for new containers, so each heartbeat is a potential scheduling opportunity for an application to run a container.When using delay scheduling, the scheduler doesn’t simply use the first scheduling opportunity it receives, but waits for up to a given maximum number of scheduling opportunities to occur before loosening the locality constraint and taking the next scheduling opportunity.

 

Note : Capacity Scheduler uses the property yarn.scheduler.capacity.node-locality-delay to a positive integer representing the number of scheduling opportunities that it is prepared to miss before loosening the node constraint to match any node in the same rack.In Fair Scheduler yarn.scheduler.fair.locality.threshold.node is used and setting it to 0.5 means that the scheduler should wait until half of the nodes in the cluster have presented scheduling opportunities before accepting another node in the same rack.