mapreduce pattern for calculating minimum,maximum and count

Numerical Summarizations is a map reduce pattern which can be used to find minimum, maximum, average, median, and standard deviation of a dataset.This pattern can be used in the scenarios where the data you are dealing with or you want to aggregate is of numerical type and the data can be grouped by specific fields.The Numerical Summarizations will help you to get the top-level view of your data.The combiner can greatly reduce the number of intermediate key/value pairs to be sent across the network to the reducers for some numerical summarization functions but we should make sure that the aggregate function should be associative and commutative in nature.

Note : Make sure to reduce the amount of data being sent to the reducers by choosing only the fields that are necessary to the analytic and handling any bad input conditions properly.A custom partitioner is often overlooked, but taking the time to understand the distribution of output keys and partitioning based on this distribution will improve performance when grouping . Starting a hundred reduce tasks, only to have eighty of them complete in thirty seconds and the others in twenty-five minutes, is not efficient.Aggregations performed by jobs using this pattern typically perform well when the combiner is properly used. These types of operations are what MapReduce was built for.

Calculating the minimum, maximum, and count of a given field are all excellent applications of the numerical summarization pattern. After a grouping operation, the reducer simply iterates through all the values associated with the group and finds the min and max, as well as counts the number of members in the key grouping. Due to the associative and commutative properties, a combiner can be used to vastly cut down on the number of intermediate key/value pairs that need to be shuffled to the reducers. If implemented correctly, the code used for your reducer can be identical to that of a combiner.

Note : The reducer implementation can be used as the jobs combiner. As we are only interested in the count, minimum  and maximum data multiple data points  from the same group do not have to be sent to the reducer. The minimum and maximum value can be calculated for each local map task without having an effect on the final minimum and maximum value. The counting operation is an associative and commutative operation and won’t be harmed by using a combiner.

Below are the use cases for which this can be used
  • Average,Median and Standard deviation
  • Min,Max and Count
  • Record count
  • Word count
Problem :

1. Given a list of employees with there department and salary find the maximum and minimum salary in each department.
2. Given a list of employees with there department find the count of employees in each department.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

The CustomMinMaxTuple is a Writable object that stores three values. This class is used as the output value from the mapper. While these values can be crammed into a Text object with some delimiter, it is typically a better practice to create a custom Writable. Not only is it cleaner, but you won’t have to worry about any string parsing when it comes time to grab these values from the reduce phase.


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class CustomMinMaxTuple implements Writable {

private Double min = new Double(0);
private Double max = new Double(0);
private long count = 1;

public Double getMin() {
return min;
}

public void setMin(Double min) {
this.min = min;
}

public Double getMax() {
return max;
}

public void setMax(Double max) {
this.max = max;
}

public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public void readFields(DataInput in) throws IOException {

min = in.readDouble();
max = in.readDouble();
count = in.readLong();
}

public void write(DataOutput out) throws IOException {

out.writeDouble(min);
out.writeDouble(max);
out.writeLong(count);
}

public String toString() {
return min + "\t" + max + "\t" + count;
}

}

In the mapper class we are splitting the input data using comma as a delimiter and then checking for some invalid data to ignore it in the if condition.Salary information is stored in the 7th index so we are fetching the salary and storing it in outTuple.The salary is output twice so that we can take advantage of the combiner optimization that is described later. The third column will be a count of 1 which we will use in the reducer to count the number of employees in each department.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeMinMaxCountMapper extends Mapper<Object, Text, Text, CustomMinMaxTuple> {

private CustomMinMaxTuple outTuple = new CustomMinMaxTuple();
private Text departmentName = new Text();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] field = data.split(",", -1);
double salary = 0;

if (null != field && field.length == 9 && field[7].length() >0) {

salary=Double.parseDouble(field[7]);
outTuple.setMin(salary);
outTuple.setMax(salary);
outTuple.setCount(1);
departmentName.set(field[3]);
context.write(departmentName, outTuple);
}
}
}

The reducer iterates through the values to find the minimum and maximum salary, and sums the counts. We start by initializing the output result for each input group. For each value in this group, if the output result’s minimum is not yet set, or the values minimum is less than results current minimum, we set the results minimum to the input value. The same logic applies to the maximum, except using a greater than operator. Each value’s count is added to a running sum. After determining the minimum and maximum salary from all input values, the final count is set to our output value. The input key is then written to the file system along with the output value.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeMinMaxCountReducer extends Reducer<Text, CustomMinMaxTuple, Text, CustomMinMaxTuple> {

private CustomMinMaxTuple result = new CustomMinMaxTuple();

public void reduce(Text key, Iterable<CustomMinMaxTuple> values, Context context)
throws IOException, InterruptedException {

result.setMin(null);
result.setMax(null);
result.setCount(0);
long sum = 0;
for (CustomMinMaxTuple minMaxCountTuple : values) {

if (result.getMax() == null || (minMaxCountTuple.getMax() > result.getMax())) {

result.setMax(minMaxCountTuple.getMax());

}
if (result.getMin() == null || (minMaxCountTuple.getMin() < result.getMin())) {

result.setMin(minMaxCountTuple.getMin());

}
sum = sum + minMaxCountTuple.getCount();
result.setCount(sum);
}
context.write(new Text(key.toString()), result);
}
}

Using combiner to optimize the processing

The reducer implementation just shown can be used as the jobs combiner. As we are only interested in the count, minimum and maximum salary multiple salary from the same department do not have to be sent to the reducer. The minimum and maximum salary per department can be calculated for each local map task without having an effect on the final minimum and maximum. The counting operation is an associative and commutative operation and won’t be harmed by using a combiner.

Finally we will use the driver class to test everything is working fine as expected


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {

public static void main(String[] args) throws Exception {

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job job = Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.setJobName("Find_Max_Min_Avg");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(EmployeeMinMaxCountMapper.class);
job.setReducerClass(EmployeeMinMaxCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CustomMinMaxTuple.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

 

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *