mapreduce example to binning the data using multipleoutputs of hadoop framework

Binning is very similar to partitioning and often can be used to solve the same problem. The major difference is in how the bins or partitions are built using the MapReduce framework. In some situations, one solution works better than the other.Binning splits data up in the map phase instead of in the partitioner. This has the major advantage of eliminating the need for a reduce phase, usually leading to more efficient resource allocation.

The downside is that each mapper will now have one file per possible output bin. This means that, if you have a thousand bins and a thousand mappers, you are going to output a total of one million files. This is bad for NameNode scalability and follow-on analytics. The partitioning pattern will have one output file per category and does not have this problem.

Structure

This pattern’s driver is unique in using the MultipleOutputs class, which sets up the job’s output to write multiple distinct files.The mapper looks at each line, then iterates through a list of criteria for each bin. If the record meets the criteria, it is sent to that bin.

Notes : No combiner, partitioner, or reducer is used in this pattern. Each mapper outputs one small file per bin.Data should not be left as a bunch of tiny files. At some point, you should run some postprocessing that collects the outputs into larger files.

Achieving this in Pig
SPLIT input_data INTO police IF department == 'police',
law IF department == 'law',
admin IF (department == 'admin');
Performance analysis

This pattern has the same scalability and performance properties as other map-only jobs. No sort, shuffle, or reduce needs to be performed, and most of the processing is going to be done on data that is local.

Problem to solve

Lets solve a problem in the telecom domain. The data received from ems is tagged with the subelement type and we want the bin the data based on the subelement type . For this example we will use three types of subelement type PTP,SYS and ETH and we will bin the data into the respective files.

Here is a sample input data attached ems_data.csv

Driver code

We will use MultipleOutputs which takes in a name, bins, that is used in the mapper to write different output. Output counters are disabled by default, so be sure to turn those on if you don’t expect a large number of named outputs. We also set the number of reduce tasks to zero, as this is a map-only job.

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DriverBinning {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverBinning.class);
job.setJobName("Binning_Data");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
MultipleOutputs.addNamedOutput(job, "datatype", TextOutputFormat.class,Text.class, NullWritable.class);
job.setMapperClass(BinningMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Mapper code

The setup phase creates an instance of MultipleOutputs using the context.The mapper consists of several if-else statements to check each of the tags of a ems data.Each tag is checked against one of our tags of interest. If the data contains the tag, it is written to the bin. Posts with multiple interesting tags will essentially be duplicated as they are written to the appropriate bins. Be sure to close the MultipleOutputs during cleanup Otherwise, you may not have much output at all.

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class BinningMapper extends Mapper<Object, Text, Text, NullWritable> {

private MultipleOutputs<Text, NullWritable> multipleOutputs = null;

@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<Text, NullWritable>(context);
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String[] field = value.toString().split(",", -1);
if (null != field && field.length > 40) {

String type = field[2];

if (type.equalsIgnoreCase("PTP")) {

multipleOutputs.write("datatype", value, NullWritable.get(), "ptp");

} else if (type.equalsIgnoreCase("SYS")) {

multipleOutputs.write("datatype", value, NullWritable.get(), "sys");

} else if (type.equalsIgnoreCase("ETH")) {

multipleOutputs.write("datatype", value, NullWritable.get(), "eth");

}

}

}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}

Output of the program

The output will have a separate file for each subelement type and the output will have eth-m-00000,ptp-m-00000 and sys-m-00000 files with eth,ptp and sys data respectively.