mapreduce example for simple random sampling of data

In simple random sampling (SRS), we want to grab a subset of our larger data set in which each record has an equal probability of being selected. Typically this is useful for sizing down a data set to be able to do representative analysis on a more manageable set of data.
We use a a random number generator will produce a value, and if the value is below a threshold, keep the record. Otherwise, toss it out.

Problem to Solve

1. Given a list of employees with there information output a random sample a subset of our larger data set to do representative analysis of the data.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Mapper Code

In the mapper code, the setup function is used to pull the filter_percentage configuration value so we can use it in the map function. In the map function, a simple check against the next random number is done. The random number will be anywhere between 0 and 1, so by comparing against the specified threshold, we can keep or throw out the record.As this is a map-only job, there is no combiner or reducer. All output records will be written directly to the file system. When using a small percentage, you will find that the files will be tiny and plentiful. If this is the case, collect the files as a post-processing step using hadoop fs -cat or set the number of reducers to 1 without specifying a reducer class, which will tell the MapReduce framework to use a single identity reducer that simply collects the output into a single file.

import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeSampling extends Mapper<Object, Text, NullWritable, Text> {

private Random randomNumber = new Random();
private Double samplingPercentage;

protected void setup(Context context) throws IOException, InterruptedException {
String percentage = context.getConfiguration().get("sampling_percentage");
samplingPercentage = Double.parseDouble(percentage) / 100.0;
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (randomNumber.nextDouble() < samplingPercentage) {
context.write(NullWritable.get(), value);
}

}

}
Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output will contain sample data of employees which can be used for further analysis.

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class SamplingDriver {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
conf.set("sampling_percentage","20");
Job job = Job.getInstance(conf);
job.setJarByClass(SamplingDriver.class);
job.setJobName("Sampling_Employees");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(EmployeeSampling.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}