mapreduce example for calculating standard deviation and median on a sample data

A standard deviation shows how much variation exists in the data from the average, thus requiring the average to be discovered prior to reduction. The easiest way to perform these operations involves copying the list of values into a temporary list in order to find the median or iterating over the set again to determine the standard deviation.With large data sets, this implementation may result in Java heap space issues, because each value is copied into memory for every input group.

Problem

Given a list of employee salary and the department ,determine the median and standard deviation of salary of each department.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Custom Writable

The EmployeeStandardDeviationTuple is a Writable object that stores two values standard deviation and median. This class is used as the output value from the reducer. While these values can be crammed into a Text object with some delimiter, it is typically a better practice to create a custom Writable.


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class EmployeeStandardDeviationTuple implements Writable {

private double sd;
private double median;

public double getSd() {
return sd;
}

public void setSd(double sd) {
this.sd = sd;
}

public double getMedian() {
return median;
}

public void setMedian(double median) {
this.median = median;
}

public void readFields(DataInput in) throws IOException {

sd = in.readDouble();
median = in.readDouble();

}

public void write(DataOutput out) throws IOException {

out.writeDouble(getSd());
out.writeDouble(getMedian());
}

@Override
public String toString() {
return sd + "\t" + median;

}

}

Mapper code

The mapper will process each input record to calculate the median salary within each department. The output key is the department, which is parsed from the input file. The output value is a salary.


import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeStandardDeviationMapper extends Mapper<Object, Text, Text, DoubleWritable> {

private Text departmentName = new Text();
private DoubleWritable salaryWritable = new DoubleWritable(0);

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
double salary = 0;
if (null != field && field.length == 9 && field[7].length() > 0) {
salary = Double.parseDouble(field[7]);
salaryWritable.set(salary);
departmentName.set(field[3]);
context.write(departmentName, salaryWritable);

}

}

}

Reducer Code

The reducer code iterates through the given set of values and adds each value to an in-memory list. The iteration also calculates a running sum and count. After iteration, the salary is sorted to find the median value. If the list has an odd number of entries, the median value is set to the middle value. If the number is even,the middle two values are averaged. Next, the standard deviation is calculated by iterating through our sorted list after finding the mean from our running sum and count.A running sum of deviations is calculated by squaring the difference between each salaries and the mean. The standard deviation is then calculated from this sum. Finally, the median and standard deviation are output along with the input key.


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeStandardDeviationReducer
extends Reducer<Text, DoubleWritable, Text, EmployeeStandardDeviationTuple> {

public List<Double> list = new ArrayList<Double>();
public EmployeeStandardDeviationTuple employeeStandardDeviationTuple = new EmployeeStandardDeviationTuple();

public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {

double sum = 0;
double count = 0;
list.clear();
employeeStandardDeviationTuple.setMedian(0);
employeeStandardDeviationTuple.setSd(0);

for (DoubleWritable doubleWritable : values) {

sum = sum + doubleWritable.get();
count = count + 1;
list.add(doubleWritable.get());

}
Collections.sort(list);
int length = list.size();
double median = 0;

if (length == 2) {

double medianSum = list.get(0) + list.get(1);
median = medianSum / 2;

} else if (length % 2 == 0) {

double medianSum = list.get((length / 2) -1) + list.get(length / 2);
median = medianSum / 2;

} else {
median = list.get(length / 2);
}

employeeStandardDeviationTuple.setMedian(median);

double mean = sum / count;
double sumOfSquares = 0;

for (double doubleWritable : list) {

sumOfSquares += (doubleWritable - mean) * (doubleWritable - mean);

}

employeeStandardDeviationTuple.setSd((double) Math.sqrt(sumOfSquares / (count - 1)));

context.write(key, employeeStandardDeviationTuple);

}
}

Driver Code

Finally we will use the driver class to test everything is working fine as expected.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverStandardDeviation {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverStandardDeviation.class);
job.setJobName("Find_Standard_Deviation_Salary");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(EmployeeStandardDeviationMapper.class);
job.setReducerClass(EmployeeStandardDeviationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

11 thoughts on “mapreduce example for calculating standard deviation and median on a sample data”

  1. i am getting error overhere
    Configuration conf = ConfigurationFactory.getInstance();
    in standard deviation
    so please recover sir

    cheers
    Anil kumar

    1. I think i have missed to add the factory class but u can just replace that line of code with

      Configuration conf = new org.apache.hadoop.conf.Configuration();

  2. sumOfSquares in for loop is false because you dont get
    sumOfSquares = (doubleWritable – mean) * (doubleWritable – mean); change to
    sumOfSquares += (doubleWritable – mean) * (doubleWritable – mean);

  3. ı wanna say :
    double medianSum = list.get(length / 2) + list.get(length / 2 + 1);

    change to

    double medianSum = list.get(length / 2-1) + list.get(length / 2 );

    maybe ı think it can be false so please explanation clearly
    because ı have an exam for mapreduce so suggest some notes please.

  4. can ı ask a question for mapreduce, haw many counts of reducer so key count=reducer count is it right?

    1. Number of reducers does not depend on number of keys as one reducer can process data of different keys. By default number of reducers is set to 1 which can be changed with the configuration job.setNumReduceTasks(number_of_reducers) . The right number should be calculated with below formula as per the hadoop documentation 0.95 or 1.75 multiplied by number of nodes * no. of maximum containers per node

Leave a Reply

Your email address will not be published. Required fields are marked *