mapreduce example to sort the data using the total order partitioner and input sampler utility

Sorting is easy in sequential programming. Sorting in MapReduce, or more generally in parallel, is not easy. This is because the typical divide and conquer approach is a bit harder to apply here.Each individual reducer will sort its data by key, but unfortunately, this sorting is not global across all data. What we want to do here is a total order sorting where, if you concatenate the output files, the records are sorted. If we just concatenate the output of a simple MapReduce job, segments of the data will be sorted, but the whole set will not be.

This pattern has two phases: an analyze phase that determines the ranges, and the order phase that actually sorts the data. The analyze phase is optional in some ways. You need to run it only once if the distribution of your data does not change quickly over time, because the value ranges it produces will continue to perform well. Also, in some cases, you may be able to guess the partitions yourself, especially if the data is evenly distributed.For example if we know that there are 1 million employees and 1000 reducers each range is going to be of 1000.This is because employees by id should be spread out evenly and since you know the number of total employees, you can divide that number by the number of reducers you want to use to get the range for each reducer.

Structure of pattern

First Job

1. We pass the input data through a mapper and output the sort key as its output key as doublewritable and the value as is and we store in the SequenceFileOutputFormat.
2. We are not using any reducer in the initial phase and setting the number of reducer to 0.
3. The output of this mapper which is a first stage processing before sorting the data is stored in the staging location which is a intermediate location.

Second Job

1. The input for this job is the output of the previous job.
2. We are using InputSampler.writePartitionFile to create a partition file which takes a job instance and RandomSampler instance for which we pass the frequency Probability with which a key will be chosen and numSamples which is a total number of samples to obtain from all selected splits as a constructor parameter.
3. A custom partitioner is used that loads up the partition file. In Hadoop, you can use the TotalOrderPartitioner, which is built specifically for this purpose. It takes the data ranges from the partition file produced in the previous step and decides which reducer to send the data to.
4. The reducer’s job here is simple. The shuffle and sort take care of the heavy lifting. The reduce function simply takes the values that have come in and outputs them.The number of reducers needs to be equal to the number of partitions for the TotalOrderPartitioner to work properly.
5. The output files will contain sorted data, and the output file names will be sorted such that the data is in a total sorting. In Hadoop, you’ll be able to issue hadoop fs -cat output/part-r-* and retrieve the data in a sorted manner.

Note : That the number of ranges in the intermediate partition needs to be equal to the number of reducers in the order step. If you decide to change the number of reducers and you’ve been reusing the same file,you’ll need to rebuild it.

Note : Using Text for nearly everything in Hadoop is very natural since that’s the format in which data is coming in. Be careful when sorting on numerical data, though! The string “20000” is less than than “9” if they are compared as strings, which is not what we want. Either pad the numbers with zeros or use a numerical data type.

Achieving the same in pig

Ordering in Pig is syntactically pretty easy, but it’s a very expensive operation. Behind the scenes, it will run a multi-stage MapReduce job to first find the partitions, and then perform the actual sort.

employee_sorted_data = ORDER employee_data BY salary;
Performance analysis

This operation is expensive because you effectively have to load and parse the data twice first to build the partition ranges, and then to actually sort the data. The job that builds the partitions is straightforward and efficient since it has only one reducer and sends a minimal amount of data over the network. The output file is small, so writing it out is trivial. Also, you may only have to run this now and then, which will amortize the cost of building it over time. The order step of the job has performance characteristics similar to the other data organization patterns, because it has to move all of the data over the network and write all of the data back out. Therefore, you should use a relatively large number of reducers.

Problem to solve

1. Given a list of employees with there information sort the data based on the salary using the total order sorting.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Driver Code

We create input and output variables from args which will be passed from oozie or a command line in real cluster. It creates path files to the partition list and the staging directory. The partition list is used by the TotalOrderPartitioner to make sure the key/value pairs are sorted properly. The staging directory is used to store intermediate output between the two jobs. The main thing to note is that the first job is a map-only only job that uses a SequenceFileOutputFormat.

The second job uses the identity mapper and our reducer implementation. The input is the output from the first job, so we’ll use the identity mapper to output the key/value pairs as they are stored from the output. The job is configured to 10 reducers, but any reasonable number can be used. Next, the partition file is configured, even though we have not created it yet which get created at line InputSampler.writePartitionFile(orderJob, new InputSampler.RandomSampler<Text, Text>(sample, 10000)) .

The next important line uses the InputSampler utility. This sampler writes the partition file by reading through the configured input directory of the job. Using the RandomSampler, it takes a configurable number of samples of the previous job’s output. This can be an expensive operation, as the entire output is read using this constructor.

Note : If your data distribution is unlikely to change, it would be worthwhile to keep this partition file around. It can then be used over and over again for this job in the future as new data arrives on the system.

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class DriverTotalOrderSorting {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args = new String[] { "Replace this string with Input Path location","Replace this string with output Path location","Replace this string with staging Path location","Replace this string with partition folder Path location","Replace this string with partition file Path location" };

FileUtils.deleteDirectory(new File(args[1]));
FileUtils.deleteDirectory(new File(args[2]));
FileUtils.deleteDirectory(new File(args[3]));
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path partitionFile = new Path(args[4]);
Path outputStage = new Path(args[2]);
Path outputOrder = new Path(args[1]);
Job sampleJob = Job.getInstance(conf);
sampleJob.setJobName("sorting_first_level_job");
sampleJob.setJarByClass(DriverTotalOrderSorting.class);
sampleJob.setMapperClass(SalarySortingMapper.class);
sampleJob.setNumReduceTasks(0);
sampleJob.setOutputKeyClass(DoubleWritable.class);
sampleJob.setOutputValueClass(Text.class);
TextInputFormat.setInputPaths(sampleJob, inputPath);
sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
TextOutputFormat.setOutputPath(sampleJob, outputStage);
int code = sampleJob.waitForCompletion(true) ? 0 : 1;

if (code == 0) {
Job orderJob = Job.getInstance(conf);
orderJob.setJarByClass(DriverTotalOrderSorting.class);
orderJob.setJobName("Sorting_Phase");
// Identity mapper to output the key/value pairs in the SequenceFile
orderJob.setMapperClass(Mapper.class);
orderJob.setReducerClass(SortingValueReducer.class);
orderJob.setNumReduceTasks(10);
// Use Hadoop's TotalOrderPartitioner class
orderJob.setPartitionerClass(TotalOrderPartitioner.class);
// Set the partition file
TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(), partitionFile);
orderJob.setOutputKeyClass(DoubleWritable.class);
orderJob.setOutputValueClass(Text.class);
// Set the input to the previous job's output
orderJob.setInputFormatClass(SequenceFileInputFormat.class);
TextInputFormat.setInputPaths(orderJob, outputStage);
// Set the output path to the command line parameter
TextOutputFormat.setOutputPath(orderJob, outputOrder);
orderJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "");
// Use the InputSampler to go through the output of the previous
// job, sample it, and create the partition file
double sample = 20d;
InputSampler.writePartitionFile(orderJob, new InputSampler.RandomSampler<Text, Text>(sample, 10000));
// Submit the job
code = orderJob.waitForCompletion(true) ? 0 : 1;
}
System.exit(code);

}
}

First level Mapper Code

This mapper simply pulls the salary information from the data for each employee and sets it as the sort key for the record. The input value is output along with it. These key/value pairs, per our job configuration, are written to a SequenceFile that is used to create the
partition list for the TotalOrderPartitioner. There is no reducer for this job.

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalarySortingMapper extends Mapper<Object, Text, DoubleWritable, Text> {
private DoubleWritable salaryKey = new DoubleWritable();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] field = data.split(",", -1);
Double salary=null;
if (null != field && field.length == 9 && field[7].length() > 0) {
salary = Double.parseDouble(field[7]);
salaryKey.set(salary);
context.write(salaryKey, value);
}
}
}

 

Sorting Mapper Code

This job simply uses the identity mapper to take each input key/value pair and output them. No special configuration or implementation is needed.

Sorting reducer code

Because the TotalOrderPartitioner took care of all the sorting, all the reducer needs to do is output the values with a NullWritable object. This will produce a part file for this reducer that is sorted by salary. The partitioner ensures that the concatenation of all these part files produces a totally ordered data set.

import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortingValueReducer extends Reducer<DoubleWritable, Text, Text, NullWritable> {
public void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text t : values) {
context.write(t, NullWritable.get());
}
}
}