job chaining in mapreduce with jobcontrol ,controlledJob and driver

Job chaining is extremely important to understand and have an operational plan for in your environment. Many people find that they can’t solve a problem with a single Map Reduce job. Some jobs in a chain will run in parallel, some will have their output fed into other jobs, and so on. Once you start to understand how to start solving problems as a series of MapReduce jobs, you’ll be able to tackle a whole new class of challenges.

With the Driver

Probably the simplest method for performing job chaining is to have a master driver that simply fires off multiple job-specific drivers.Take the driver for each MapReduce job and call them in the sequence they should run. You’ll have to specifically be sure that the output path of the first job is the input path of the second.In a production scenario, the temporary directories should be cleaned up so they don’t linger past the completion of the job. Lack of discipline here can surprisingly fill up your cluster rather quickly. Also, be careful of how much temporary data you are actually creating because you’ll need storage in your file system to store that data.You can pretty easily extrapolate this approach to create chains that are much longer than just two jobs. Just be sure to keep track of all of the temporary paths and optionally clean up the data not being used anymore as the job runs.

You can also fire off multiple jobs in parallel by using Job.submit() instead of Job.waitForCompletion(). The submit method returns immediately to the current thread and runs the job in the background. This allows you to run several jobs at once. Use Job.isComplete(), a nonblocking job completion check, to constantly poll to see whether all of the jobs are complete. The other thing to pay attention to is job success. It’s not good enough to just know that the job completed. You also need to check whether it succeeded or not. If a dependency job failed, you should break out of the entire chain instead of trying to let it continue.

Below is a total order sorting driver in which job chaining is used

In the first job We are processing the employee input data using the SalarySortingMapper and writting the salary as key and the actual record as value which is stored as a SequenceFileOutputFormat.

The input to the second job is the output from the first job were we are using job chaining.The first job is checked for success before executing the second job.

Note : whether a job is success or failure its very important that intermediate output directory is cleaned up. This is an important and often overlooked step. Leaving any intermediate output will fill up a cluster quickly and require you to delete the output by hand.

 

Note : The driver in parallel job chaining is similar to the previous example. The only big enhancement is that jobs are submitted in parallel and then monitored until completion. The two jobs run in this example are independent. (However, they require the previous example to have completed successfully.) This has the added benefit of utilizing cluster resources better to have them execute simultaneously. This can be achieved by using the helper function which is configured for each job. It looks very standard to any other configuration, except Job.submit is used rather than Job.waitForCompletion. This will submit the job and then immediately return, allowing the application to continue.

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class DriverTotalOrderSorting {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] { "Replace this string with Input Path location","Replace this string with output Path location","Replace this string with staging Path location","Replace this string with partition folder Path location","Replace this string with partition file Path location" };

FileUtils.deleteDirectory(new File(args[1]));
FileUtils.deleteDirectory(new File(args[2]));
FileUtils.deleteDirectory(new File(args[3]));
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path partitionFile = new Path(args[4]);
Path outputStage = new Path(args[2]);
Path outputOrder = new Path(args[1]);
Job sampleJob = Job.getInstance(conf);
sampleJob.setJobName("sorting_first_level_job");
sampleJob.setJarByClass(DriverTotalOrderSorting.class);
sampleJob.setMapperClass(SalarySortingMapper.class);
sampleJob.setNumReduceTasks(0);
sampleJob.setOutputKeyClass(DoubleWritable.class);
sampleJob.setOutputValueClass(Text.class);
TextInputFormat.setInputPaths(sampleJob, inputPath);
sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
TextOutputFormat.setOutputPath(sampleJob, outputStage);
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

if (code == 0) {
Job orderJob = Job.getInstance(conf);
orderJob.setJarByClass(DriverTotalOrderSorting.class);
orderJob.setJobName("Sorting_Phase");
// Identity mapper to output the key/value pairs in the SequenceFile
orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(SortingValueReducer.class);
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(), partitionFile);
orderJob.setOutputKeyClass(DoubleWritable.class);
orderJob.setOutputValueClass(Text.class);
// Set the input to the previous job's output
orderJob.setInputFormatClass(SequenceFileInputFormat.class);
TextInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
orderJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
double sample = 20d;
InputSampler.writePartitionFile(orderJob, new InputSampler.RandomSampler<Text, Text>(sample, 10000));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 1;
}
System.exit(code);

}
}

With JobControl and ControlledJob

The JobControl and ControlledJob classes make up a system for chaining MapReduce jobs and has some nice features like being able to track the state of the chain and fire off jobs automatically when they’re ready by declaring their dependencies. Using JobControl is the right way of doing job chaining, but can sometimes be too heavyweight for simpler applications.

Note : You still have to keep track of temporary data and clean it up afterwards or in the event of a failure.

Below is the code snippet to use the job controller.


Configuration conf = new Configuration();

//First job

Job firstJob = getFirstJob(conf, "Input path", "output path");
Job secondJob = getSecondJob(conf, "Input path", "output path");
Job thirdJob = getThirdJob(conf, "Input path", "output path");
Job fourthJob = getFourthJob(conf, "Input path", "output path");

if (firstJob.waitForCompletion(true)) {

ControlledJob secondControlledJob = new ControlledJob(secondJob);
ControlledJob thirdControlledJob = new ControlledJob(thirdJob);
thirdControlledJob.addDependingJob(secondControlledJob);
ControlledJob fourthControlledJob = new ControlledJob(fourthJob);
fourthControlledJob.addDependingJob(thirdControlledJob);
JobControl jc = new JobControl("job_chaining");
jc.addJob(secondControlledJob);
jc.addJob(thirdControlledJob);
jc.addJob(fourthControlledJob);
jc.run();
code = jc.getFailedJobList().size() == 0 ? 0 : 1;

}

//Code to delete all the temporary data.