Finding outliers is an important part of data analysis because these records are typically the most interesting and unique pieces of data in the set. The point of this pattern is to find the best records for a specific criterion so that you can take a look at them and perhaps figure out what caused them to be so special. If you can define a ranking function or comparison function between two records that determines whether one is higher than the other, you can apply this pattern to use MapReduce to find the records with the highest value across your entire data set. Also number of output records should be significantly fewer than the number of input records because at a certain point it just makes more sense to do a total ordering of the data set. This pattern utilizes both the mapper and the reducer. The mappers will find their local top N, then all of the individual top N sets will compete for the final top N in the reducer. Since the number of records coming out of the mappers is at most N and N is relatively small, we’ll only need one reducer.
The mapper reads each record and keeps an array object of size N that collects the largest N values. In the cleanup phase of the mapper we will finally emit the N records stored in the array as the value, with a null key. These are the lowest N for this particular map task.
Uses of this pattern
- Catchy dashboards to display top selling products or something of interest.
- Select interesting data
- Anomaly analysis – Odd data analyses
Pig will have issues performing this query in any sort of optimal way as ordering is expensive so mapreduce is more optimal for this pattern. Below is the pig snippet that can be used but its not efficient.
HighestPaidEmployer = ORDER Employee_Data BY salary DESC; TopPaidEmployess = LIMIT HighestPaidEmployer 10; dump TopPaidEmployess
Performance analysis
The number we need to pay attention to when using this pattern is how many records the reducer is getting. Each map task is going to output N records, and the job will consist of M map tasks, so the reducer is going to have to work through N * M records. This can be a lot if the N is big number.If N is small number within hundreds the top ten pattern is typically very good and the only limitations is from the use of a single reducer, regardless of the number of records it is handling.
A single reducer getting a lot of data is bad for a few reasons:
1.The sort can become an expensive operation when it has too many records and has to do most of the sorting on local disk, instead of in memory.
2. The host where the reducer is running will receive a lot of data over the network, which may create a network resource hot spot for that single host.
3. Naturally, scanning through all the data in the reduce will take a long time if there
are many records to look through.
4. Any sort of memory growth in the reducer has the possibility of blowing through the Java virtual machine’s memory. For example, if you are collecting all of the values into an ArrayList to perform a median, that ArrayList can get very big. This will
not be a particular problem if you’re really looking for the top ten items, but if you want to extract a very large number you may run into memory limits.
5. Writes to the output file are not parallelized. Again, this is not an issue for the top ten, but becomes a factor when the data extracted is very large.
Note : This pattern is intended only for pretty small values for N, in the tens or hundreds at most, though you can likely push it a bit further. There is a fuzzy line in which just doing a total ordering of the data set is likely more effective.
Problem to Solve :
1. Given a list of employees with there information find the top 10 highest paid employees.
Here is a sample input data attached employee_info.csv
Input Data sample
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate
dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51
Mapper Code
Each mapper determines the top ten records of its input split and outputs them to the reduce phase. The mappers are essentially filtering their input split to the top ten records, and the reducer is responsible for the final ten. Just remember to configure your job to only use one reducer.The mapper processes all input records and stores them in a Tree Map. If there are more than ten records in our Tree Map, the first element can be removed.After all the records have been processed, the top ten records in the Tree Map are output to the reducers in the cleanup method. This method gets called once after all key/value pairs have been through map, just like how setup is called once before any calls to map.
import java.io.IOException; import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class HighestPaidEmployeeMapper extends Mapper<Object, Text, NullWritable, Text> { private TreeMap<Double, Text> salaryData = new TreeMap<Double, Text>(); @Override public void map(Object key, Text values, Context context) throws IOException, InterruptedException { String data = values.toString(); String[] field = data.split(",", -1); Double salary = null; if (null != field && field.length == 9 && field[7].length() > 0) { salary = Double.parseDouble(field[7]); Text val = new Text(values); salaryData.put(salary, val); if (salaryData.size() > 10) { salaryData.remove(salaryData.firstKey()); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (Map.Entry<Double, Text> entry : salaryData.entrySet()) { context.write(NullWritable.get(), entry.getValue()); } } }
Reducer code
Overall, the reducer determines its top ten records in a way that’s very similar to the mapper. Because we configured our job to have one reducer using job.setNumReduceTasks(1) and we used NullWritable as our key, there will be one input group for this reducer that contains all the potential top ten records. The reducer iterates through all these records and stores them in a TreeMap. If the TreeMap’s size is above ten, the first element (lowest value) is remove from the map. After all the values have been iterated over, the values contained in the TreeMap are flushed to the file system in descending order. This ordering is achieved by getting the descending map from the TreeMap prior to outputting the values. This can be done directly in the reduce method, because there will be only one input group, but doing it in the cleanup method would also work.
import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class HighestPaidEmployeeReducer extends Reducer<NullWritable, Text, NullWritable, Text> { private TreeMap<Double, Text> salaryData = new TreeMap<Double, Text>(); public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String data = value.toString(); String[] field = data.split(",", -1); if (field.length == 9) { salaryData.put(Double.parseDouble(field[7]), new Text(value)); if (salaryData.size() > 10) { salaryData.remove(salaryData.firstKey()); } } } for (Text t : salaryData.descendingMap().values()) { context.write(NullWritable.get(), t); } } }
Note: There is no need for a combiner in this job, although the reducer code could technically be used. The combiner would simply output the same ten records and thus cause unnecessary processing. Also, this job is hardcoded to find the top ten records, but could easily be configured to find the top N records using a variable captured in the setup method.
Driver Code
Finally we will use the driver class to test everything is working fine as expected . The output will contain top 10 highest paid employees.
import java.io.File; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.hadoop.design.summarization.blog.ConfigurationFactory; public class DriverHighestPaidEmployee { public static void main(String[] args) throws Exception { /* * I have used my local path in windows change the path as per your * local machine */ args = new String[] { "Replace this string with Input Path location", "Replace this string with output Path location" }; /* delete the output directory before running the job */ FileUtils.deleteDirectory(new File(args[1])); /* set the hadoop system parameter */ System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location"); if (args.length != 2) { System.err.println("Please specify the input and output path"); System.exit(-1); } Configuration conf = ConfigurationFactory.getInstance(); Job job = Job.getInstance(conf); job.setJarByClass(DriverHighestPaidEmployee.class); job.setJobName("Highest_Paid_Employees"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(HighestPaidEmployeeMapper.class); job.setReducerClass(HighestPaidEmployeeReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Output from reducer
EVANS,GINGER S,COMMISSIONER OF AVIATION,AVIATION,F,Salary,,300000.00,
JOHNSON,EDDIE T,SUPERINTENDENT OF POLICE,POLICE,F,Salary,,260004.00,
EMANUEL,RAHM ,MAYOR,MAYOR’S OFFICE,F,Salary,,216210.00,
SANTIAGO,JOSE A,FIRE COMMISSIONER,FIRE,F,Salary,,202728.00,
FORD II,RICHARD C,FIRST DEPUTY FIRE COMMISSIONER,FIRE,F,Salary,,197736.00,
NAVARRO,KEVIN B,FIRST DEPUTY SUPERINTENDENT,POLICE,F,Salary,,197724.00,
DEAL,AARON J,CHIEF OF STAFF,MAYOR’S OFFICE,F,Salary,,195000.00,
VASQUEZ,ANTHONY P,DEPUTY FIRE COMMISSIONER,FIRE,F,Salary,,187680.00,
WEST,BARBARA J,CHIEF,POLICE,F,Salary,,185364.00,
VOGT,WILLIAM C,ASST DEPUTY FIRE COMMISSIONER,FIRE,F,Salary,,185352.00,
Hi,
How can we do the same map reduce problem in finding the top 10 employees per department showing their rank in the output file. Rank 1 would be the highest paid.
Please advice.
Thanks!