Chain folding optimization in mapreduce using chainmapper and chainreducer approach

Chain folding is an optimization that is applied to MapReduce job chains.Take a look at the map phases in the chain. If multiple map phases are adjacent, merge them into one phase. This would be the case if you had a map-only job (such as a replicated join), followed by a numerical aggregation. In this step, we are reducing the amount of times we’re hitting the disks. Consider a two-job chain in which the first job is a map-only job, which is then followed by a traditional Map Reduce job with a map phase and a reduce phase. Without this optimization, the first map-only job will write its output out to the distributed file system, and then that data will be loaded by the second job.

Instead, if we merge the map phase of the map-only job and the traditional job, that temporary data never gets written, reducing the I/O significantly. Also, fewer tasks are started, reducing overhead of task management. Chaining many map tasks together is an even more drastic optimization. In this case, there really isn’t any downside to do this other than having to possibly alter already existing code.

If the job ends with a map phase (combined or otherwise), push that phase into the reducer right before it. This is a special case with the same performance benefits as the previous step. It removes the I/O of writing temporary data out and then running a map-only job on it. It also reduces the task start-up overhead.

Note that the the first map phase of the chain cannot benefit from this next optimization. As much as possible, split up each map phase between operations that decrease the amount of data (e.g., filtering) and operations that increase the amount of data (e.g enrichment). In some cases, this is not possible because you may need some enrichment data in order to do the filtering. In these cases, look at dependent phases as one larger phase that cumulatively increases or decreases the amount of data. Push the processes that decrease the amount of data into the previous reducer, while keeping the processes that increase the amount of data where they are. This step is a bit more complex and the difference is more subtle. The gain here is that if you push minimizing map-phase processing into the previous reducer, you will reduce the amount of data written to temporary storage, as well as the amount of data loaded off disk into the next part of the chain. This can be pretty significant if a drastic amount of filtering is done.

Lets take an example from the Telecom domain. Consider we have an ems feed as below and we having a mapper running which will separate the record based on the subelement type which is PTP,ETH and SYS in the below sample data and then a traditional map and reduce phase which will process the output of the first map to run some analytics. In this scenario if we merge the map phase of the map-only job and the next level map reduce job, that temporary data never gets written, reducing the I/O significantly. Also, fewer tasks are started, reducing overhead of task management.


DEVICE_12345,/shelf=0/slot=0/port=1,PTP,40000,zuyrt800_001_00_00U200_001_00_00,4343
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,41500,zuyrt800_001_00_00U200_001_00_00,5656
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,43000,zuyrt800_001_00_00U200_001_00_00,44545
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,44500,zuyrt800_001_00_00U200_001_00_00,454545
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,40000,zuyrt800_001_00_00U200_001_00_00,121212
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,41500,zuyrt800_001_00_00U200_001_00_00,343434
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,43000,zuyrt800_001_00_00U200_001_00,4343434
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,44500,zuyrt800_001_00_00U200_001_00_00,565656
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,40000,zuyrt800_001_00_00U200_001_00_00,676767
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,41500,zuyrt800_001_00_00U200_001_00_00,12121
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,43000,zuyrt800_001_00_00U200_001_00_00,98989
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,44500,zuyrt800_001_00_00U200_001_00_00,545454

ChainMapper and ChainReducer Approach

ChainMapper and ChainReducer are special mapper and reducer classes that allow you to run multiple map phases in the mapper and multiple map phases after the reducer. You are effectively expanding the traditional map and reduce paradigm into several map phases, followed by a reduce phase, followed by several map phases. However, only one map phase and one reduce phase is ever invoked.Each chained map phase feeds into the next in the pipeline. The output of the first is then processed by the second, which is then processed by the third, and so on. The map
phases on the backend of the reducer take the output of the reducer and do additional computation. This is useful for post-processing operations or additional filtering.

Consider we have an ems dsl feed as below and we having a mapper running which will separate the record based on the subelement type which is PTP,ETH and SYS in the below sample data .The second mapper then enriches the below data with service id, which is read during the setup phase via the DistributedCache, the sample data is attached below.

dsl feed

DEVICE_12345,/shelf=0/slot=0/port=1,PTP,40000,zuyrt800_001_00_00U200_001_00_00,4343
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,41500,zuyrt800_001_00_00U200_001_00_00,5656
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,43000,zuyrt800_001_00_00U200_001_00_00,44545
DEVICE_12345,/shelf=0/slot=0/port=1,PTP,44500,zuyrt800_001_00_00U200_001_00_00,454545
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,40000,zuyrt800_001_00_00U200_001_00_00,121212
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,41500,zuyrt800_001_00_00U200_001_00_00,343434
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,43000,zuyrt800_001_00_00U200_001_00,4343434
DEVICE_12345,/shelf=0/slot=0/port=2,ETH,44500,zuyrt800_001_00_00U200_001_00_00,565656
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,40000,zuyrt800_001_00_00U200_001_00_00,676767
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,41500,zuyrt800_001_00_00U200_001_00_00,12121
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,43000,zuyrt800_001_00_00U200_001_00_00,98989
DEVICE_12345,/shelf=0/slot=0/port=3,SYS,44500,zuyrt800_001_00_00U200_001_00_00,545454
service id feed

service_123,DEVICE_12345,/shelf=0/slot=0/port=1
service_124,DEVICE_12345,/shelf=0/slot=0/port=2
service_125,DEVICE_12345,/shelf=0/slot=0/port=3

These two individual mapper classes are then chained together to feed a single reducer. This reducer is a basic aggregater that simply aggregates the total bytes consumed which is the last field in the dsl feed file.The service id,device id,port id and subelement type is then output along with the aggregated bytes consumed data.Finally, a third mapper is called that will bin the records based on subelement type PTP,SYS and ETH.

Lets consider the mappers as DsllDataMapper,ServiceInventoryDataMapper,BinningMapper and the reducer as TotalBytesConsumedReducer.

Driver Code

The driver handles configuration of the ChainMapper and ChainReducer. The most interesting piece here is adding mappers and setting the reducer. The order in which they are added affects the execution of the different mapper implementations. ChainMapper is first used to add the two map implementations that will be called back to back before any sorting and shuffling occurs. Then, the ChainReducer static methods are used to set the reducer implementation, and then finally a mapper on the end. Note that you don’t use ChainMapper to add a mapper after a reducer: use ChainReducer.

The signature of each method takes in the JobConf of a mapper/reducer class, the input and output key value pair types, and another JobConf for the mapper/reducer class. This can be used in case the mapper or reducer has overlapping configuration parameters. No special configuration is required, so we simply pass in empty JobConf objects. The seventh parameter in the signature is a flag as to pass values in the chain by reference or by value. This is an added optimization you can use if the collector does not modify the keys or values in either the mapper or the reducer. Here, we make these assumptions, so we pass objects by reference (byValue = false).We also set configure the MultipleOutputs and use a NullOutputFormat rather than the typical TextOutputFormat. Use of this output format will prevent the framework from creating the default empty part files.

Note : Though we are adding multiple mappers and reducers into the chain only one map phase and one reduce phase is ever invoked.


public static void main(String[] args) {
JobConf conf = new JobConf("ChainMapperReducer");
conf.setJarByClass(ChainMapperDriver.class);
Path dslInput = new Path(args[0]);
Path serviceIdInput = new Path(args[1]);
Path outputDir = new Path(args[2]);
ChainMapper.addMapper(conf, DsllDataMapper.class, Text.class, Text.class, Text.class,
Text.class, false, new JobConf(false));
ChainMapper.addMapper(conf, ServiceInventoryDataMapper.class, Text.class, Text.class, Text.class,
Text.class, false, new JobConf(false));
ChainReducer.setReducer(conf, TotalBytesConsumedReducer.class, Text.class, Text.class, Text.class,
Text.class, false, new JobConf(false));
ChainReducer.addMapper(conf, BinningMapper.class, Text.class, Text.class, Text.class,
Text.class, false, new JobConf(false));
conf.setInputFormat(TextInputFormat.class);
TextInputFormat.setInputPaths(conf, dslInput);
// Configure multiple outputs
conf.setOutputFormat(NullOutputFormat.class);
FileOutputFormat.setOutputPath(conf, outputDir);
MultipleOutputs.addNamedOutput(conf, "PTP", TextOutputFormat.class, Text.class,
Text.class);
MultipleOutputs.addNamedOutput(conf, "ETH", TextOutputFormat.class, Text.class,
Text.class);
MultipleOutputs.addNamedOutput(conf, "SYS", TextOutputFormat.class, Text.class,
Text.class);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// Add the user files to the DistributedCache
FileStatus[] userFiles = FileSystem.get(conf).listStatus(serviceIdInput);
for (FileStatus status : userFiles) {
DistributedCache.addCacheFile(status.getPath().toUri(), conf);
}
RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
Thread.sleep(5000);
}
System.exit(job.isSuccessful() ? 0 : 1);
}