Problem to Solve : Given a list of employees with there department and salary find the average salary in each department.
Here is a sample input data attached employee_info.csv
To calculate an average, we need two values for each group: the sum of the values that we want to average and the number of values that went into the sum. These two values can be calculated on the reduce side very trivially, by iterating through each value in the set and adding to a running sum while keeping a count. After the iteration, simply divide the sum by the count and output the average. However, if we do it this way we cannot use this same reducer implementation as a combiner, because calculating an average is not an associative operation. Instead, our mapper will output two “columns” of data,count and average. For each input record, this will simply be “1” and the value of the field. The reducer will multiply the “count” field by the “average” field to add to a running sum, and add the “count” field to a running count. It will then divide the running sum with the running count and output the count with the calculated average. With this more round-about algorithm, the reducer code can be used as a combiner as associativity is preserved.
Input Data sample
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate
dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51
Custom Writable Class
The CustomAverageTuple is a Writable object that stores two values. This class is used as the output value from the mapper. While these values can be crammed into a Text object with some delimiter, it is typically a better practice to create a custom Writable. Not only is it cleaner, but you won’t have to worry about any string parsing when it comes time to grab these values from the reduce phase.
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class CustomAverageTuple implements Writable { private Double average = new Double(0); private long count = 1; public Double getAverage() { return average; } public void setAverage(Double average) { this.average = average; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } public void readFields(DataInput in) throws IOException { average = in.readDouble(); count = in.readLong(); } public void write(DataOutput out) throws IOException { out.writeDouble(average); out.writeLong(count); } public String toString() { return average + "\t" + count; } }
Mapper Class
In the mapper class we are splitting the input data using comma as a delimiter and then checking for some invalid data to ignore it in the if condition.Salary information is stored in the 7th index so we are fetching the salary and storing it in outTuple. The second column will be a count of 1 which we will use in the reducer to count the number of employees in each department. The output value is two columns, the count of number of employees in each department and the salary of the employee. Because the mapper operates on one record at a time, the count is simply 1 and the salary is equivalent to the salary of the employee. These two values are output in a custom Writable, a CustomAverageTuple.
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AverageSalaryPerDepartmentMapper extends Mapper<Object, Text, Text, CustomAverageTuple> { private CustomAverageTuple averageTuple = new CustomAverageTuple(); private Text departmentName = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String data = value.toString(); String[] field = data.split(",", -1); double salary = 0; if (null != field && field.length == 9 && field[7].length() >0) { salary=Double.parseDouble(field[7]); averageTuple.setAverage(salary); averageTuple.setCount(1); departmentName.set(field[3]); context.write(departmentName, averageTuple); } } }
Reducer Class
The reducer code iterates through all given salary of the employee for the department and keeps two local variables: a running count and running sum. For each value, the count is multiplied by the average and added to the running sum. The count is simply added to the running count. After iteration, the input key is written to the file system with the count and average, calculated by dividing the running sum by the running count.
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class AverageSalaryPerDepartmentReducer extends Reducer<Text, CustomAverageTuple, Text, CustomAverageTuple> { private CustomAverageTuple result = new CustomAverageTuple(); public void reduce(Text key, Iterable<CustomAverageTuple> values, Context context) throws IOException, InterruptedException { double sum = 0; long count = 0; for (CustomAverageTuple customAverageTuple : values) { sum = sum + customAverageTuple.getAverage() * customAverageTuple.getCount(); count = count + customAverageTuple.getCount(); } result.setCount(count); result.setAverage(sum / count); context.write(new Text(key.toString()), result); } }
Using combiner to optimize the processing :
When determining an average, the reducer code can be used as a combiner when outputting the count along with the average. An average is not an associative operation, but if the count is output from the reducer with the sum of salary, these two values can be multiplied to preserve the sum for the final reduce phase. Without outputting the count, a combiner cannot be used because taking an average of averages is not equivalent to the true average.
Driver Class
Finally we will use the driver class to test everything is working fine as expected
import java.io.File; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DriverAverage { public static void main(String[] args) throws Exception { /* * I have used my local path in windows change the path as per your * local machine */ args = new String[] { "Replace this string with Input Path location", "Replace this string with output Path location" }; /* delete the output directory before running the job */ FileUtils.deleteDirectory(new File(args[1])); /* set the hadoop system parameter */ System.setProperty("hadoop.home.dir", "replace this with hadoop home directory location"); if (args.length != 2) { System.err.println("Please specify the input and output path"); System.exit(-1); } Configuration conf = ConfigurationFactory.getInstance(); Job job = Job.getInstance(conf); job.setJarByClass(DriverAverage.class); job.setJobName("Find_Average_Salary"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(AverageSalaryPerDepartmentMapper.class); job.setReducerClass(AverageSalaryPerDepartmentReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CustomAverageTuple.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Hello Adarsh,
First of all thanks for posting code walk through. I am trying to implement exact same thing using your tutorial.
I am new to Java/Hadoop.
While compiling the code, I am getting below error
“ConfigurationFactory cannot be resolved”
I have copied all the import statements from your code snippet, still I am getting this error. Not sure why.
I appreciate your help.
Thank you in advance.
Hi Adarsh,
If I import “import org.apache.commons.configuration.ConfigurationFactory;” to resolve ConfigurationFactory, then I am getting “The method getInstance() is undefined for the type ConfigurationFactory” error.
Thank you
Replace that line with below code Configuration conf = new org.apache.hadoop.conf.Configuration();
Hi Adarsh,
If I import “import org.apache.commons.configuration.ConfigurationFactory;” to resolve ConfigurationFactory, then I am getting “The method getInstance() is undefined for the type ConfigurationFactory” error.
Thank you
you can replace it with Configuration conf = new Configuration();