job merging optimization to process two unrelated jobs that are loading the same data to share the mapreduce pipeline

job merging is an optimization aimed to reduce the amount of I/O through the MapReduce pipeline. Job merging is a process that allows two unrelated jobs that are loading the same data to share the MapReduce pipeline. The main benefit of merging is that the data needs to be loaded and parsed only once. For some largescale jobs, that task might be the most expensive part of the whole operation.

Assume we have two or more jobs that need to run over the exact same massive amount of data. These two jobs both load and parse the data, then perform their computations. With job merging, we’ll have one MapReduce job that logically performs the two jobs at once without mixing the two applications.

Unfortunately, you must satisfy a number of prerequisites before applying this pattern. The most obvious one is that both jobs need to have the same intermediate keys and output formats, because they’ll be sharing the pipeline and thus need to use the same data types. Serialization or polymorphism can be used if this is truly a problem, but adds a bit of complexity.

At a high level, the same map function will now be performing the original duties of the old map functions, while the reduce function will perform one action or another based on a tag on the key that tells which data set it came from. The steps for merging two jobs are as follows.

1. Bring the code for the two mappers together. There are a couple of ways to do this. Copying and pasting the code works, but may
complicate which piece of code is doing what. Good in-code comments can help you compensate for this. The other method is to separate the code into two helper map functions that process the input for each algorithm.

2. In the mapper, change the writing of the key and value to tag the key with the map source.Tagging the key to indicate which map it came from is critical so that the data from the different maps don’t get mixed up.

Lets say you processing hlog and dsl data which are part of the same input file have data for different metric . If we are processing the hlog data tag the key with H and for dsl tag it with D.So if the key for dsl is DEVICE_12345 change it to DDEVICE_12345 and for hlog if the key is DEVICE_12345 change it to HDEVICE_12345. So by tagging the key we will be able to differentiate the data in the reducer.

3. In the reducer, parse out the tag and use an if-statement to switch what reducer code actually gets executed. So you can either just copy and paste the code into an if-statement or have the if-statement call out to helper functions.

4. Use MultipleOutputs to separate the output for the jobs. MultipleOutputs is a special output format helper class that allows you to write to different folders of output for the same reducer, instead of just a single folder.

Driver Code

We will use MultipleOutputs which takes in a name, bins, that is used in the mapper to write different output. Output counters are disabled by default, so be sure to turn those on if you don’t expect a large number of named outputs.

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DriverJobMerging {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverJobMerging.class);
job.setJobName("Job_Merging");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
MultipleOutputs.addNamedOutput(job, "subelement_type", TextOutputFormat.class, Text.class, Text.class);
job.setMapperClass(JobMergingMapper.class);
job.setReducerClass(JobMergingReducer.class);
job.setNumReduceTasks(10);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Mapper Code

The map method simply passes the parameters to two helper functions,each of which processes the map logic individual to write output to context.The records are tagged with either with D or H before writing it into the context so that we can differentiate the records in the reducer. So we have merged two jobs that need to run over the exact same massive amount of data. These two jobs both load and parse the data, then perform their computations. With job merging, we now have one MapReduce job that logically performs the two jobs at once without mixing the two applications.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JobMergingMapper extends Mapper<Object, Text, Text, Text> {

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

hlogDataMap(key, value, context);
dslDataMap(key, value, context);

}

public void hlogDataMap(Object key, Text value, Context context) throws IOException, InterruptedException {

String[] values = value.toString().split(",", -1);
if (null != values && values.length == 10) {
String neid = values[5];
String portid = values[6];
Text outKey = new Text(neid + portid);
Text outValue = new Text(
"H" + values[4] + "," + values[5] + "," + values[6] + "," + values[7] + "," + values[8]);
context.write(outKey, outValue);
}

}

public void dslDataMap(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
if (null != field && field.length == 62) {

String neid = field[2];
String portid = field[3];
Text outKey = new Text(neid + portid);
Text outValue = new Text(
"D" + field[0] + "," + field[5] + "," + field[7].toString() + "," + field[62] + "," + field[63]);
context.write(outKey, outValue);

}

}

}

Reducer Code

The reducer’s calls to setup and cleanup handle the creation and closing of the MultipleOutputs utility. The reduce method checks the tag of each input value and calls a helper reducer method based on the tag.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class JobMergingReducer extends Reducer<Text, Text, Text, Text> {

private MultipleOutputs<Text, Text> multipleOutputs = null;

@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<Text, Text>(context);
}

@Override
public void reduce(Text text, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for (Text value : values) {

if (value.charAt(0) == 'H') {

//Call algorithm specific to hlog

} else if (value.charAt(0) == 'D') {

//Call algorithm specific to dsl

}

}

}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}

}