mapreduce count example using mapreduce frameworks counters utility

A count or summation can tell you a lot about particular fields of data, or your data as a whole. Hourly ingest record counts can be post processed to generate helpful histograms.Some counters come built into the framework, such as number of input/output records and bytes. Hadoop allows for programmers to create their own custom counters for whatever their needs may be. This pattern describes how to utilize these custom counters to gather count or summation metrics from your data sets.

The major benefit of using counters is all the counting can be done during the map phase.The mapper processes each input record at a time to increment counters based on certain criteria. The counter is either incremented by one if counting a single instance, or incremented by some number if executing a summation. These counters are then aggregated by the TaskTrackers running the tasks and incrementally reported to the JobTracker for overall aggregation upon job success. The counters from any failed tasks are disregarded by the JobTracker in the final summation.As this job is map only, there is no combiner, partitioner, or reducer required.

use cases for counters
  1. You have a desire to gather counts or summations over large data sets.
  2. The number of counters you are going to create is small in the double digits.
Problem to Solve

Given a list of employees with there department find the number of employees in POLICE,FIRE and LAW department using MapReduce frameworks counters utility .

Note : The caveat to using counters is they are all stored in-memory by the JobTracker. The counters are serialized by each map task and sent with status updates. In order to play nice and not bog down the JobTracker, the number of counters should be in the tens range and definitely not more than 100.

The final output is a set of counters grabbed from the job framework. There is no actual output from the analytic itself. However, the job requires an output directory to execute. This directory will exist and contain a number of empty part files equivalent to the number of map tasks. This directory should be deleted on job completion.Using counters is very fast, as data is simply read in through the mapper and no output is written. Performance depends largely on the number of map tasks being executed and how much time it takes to process each record.

Here is a sample input data attached  employee_info.csv

Input Data sample for reference

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Mapper Code

In the mapper class we are splitting the input data using comma as a delimiter and then checking for some invalid data to ignore it in the if condition.Department information is stored in the 3rd index so we are fetching the department and using it as a counter. Here we want to count the number of employees in the POLICE,FIRE and LAW department so we are adding only those department into the departmentSet if required we can add additional department to it.If the processed data belongs to the required department we are incrementing the specific counter by one.

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CustomEmployeeCounter extends Mapper<Object, Text, NullWritable, NullWritable> {

public static final String DEPARTMENT_GROUP = "DEPARTMENT_GROUP";
public static final String POLICE_COUNTER = "POLICE";
public static final String FIRE_COUNTER = "FIRE";
public static final String LAW_COUNTER = "LAW";
private String[] departmentArray = new String[] { "POLICE", "FIRE", "LAW" };
private HashSet<String> departmentSet = new HashSet<String>(Arrays.asList(departmentArray));

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
String department = null;
if (null != field && field.length == 9 && field[3].length() > 0) {

department = field[3];

if (departmentSet.contains(department)) {
context.getCounter(DEPARTMENT_GROUP, department).increment(1);

}
}
}

}
Driver Code

If the job completed succesfully, we get the DEPARTMENT_GROUP counter group and write out the counter name and value to stdout. These counter values are also output when the job completes, so writing to stdout may be redundant if you are obtaining these values by log files.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DriverCounter {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverAverage.class);
job.setJobName("Counters_Example");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(CustomEmployeeCounter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);

int code = job.waitForCompletion(true) ? 0 : 1;

if (code == 0) {
for (Counter counter : job.getCounters().getGroup(CustomEmployeeCounter.DEPARTMENT_GROUP)) {
System.out.println(counter.getDisplayName() + "\t" + counter.getValue());
}
}

}
}

Sample Output

FIRE         4800

LAW         405

POLICE    12973