In simple random sampling (SRS), we want to grab a subset of our larger data set in which each record has an equal probability of being selected. Typically this is useful for sizing down a data set to be able to do representative analysis on a more manageable set of data.
We use a a random number generator will produce a value, and if the value is below a threshold, keep the record. Otherwise, toss it out.
Problem to Solve
1. Given a list of employees with there information output a random sample a subset of our larger data set to do representative analysis of the data.
Here is a sample input data attached employee_info.csv
Input Data sample
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate
dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51
Mapper Code
In the mapper code, the setup function is used to pull the filter_percentage configuration value so we can use it in the map function. In the map function, a simple check against the next random number is done. The random number will be anywhere between 0 and 1, so by comparing against the specified threshold, we can keep or throw out the record.As this is a map-only job, there is no combiner or reducer. All output records will be written directly to the file system. When using a small percentage, you will find that the files will be tiny and plentiful. If this is the case, collect the files as a post-processing step using hadoop fs -cat or set the number of reducers to 1 without specifying a reducer class, which will tell the MapReduce framework to use a single identity reducer that simply collects the output into a single file.
import java.io.IOException; import java.util.Random; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class EmployeeSampling extends Mapper<Object, Text, NullWritable, Text> { private Random randomNumber = new Random(); private Double samplingPercentage; protected void setup(Context context) throws IOException, InterruptedException { String percentage = context.getConfiguration().get("sampling_percentage"); samplingPercentage = Double.parseDouble(percentage) / 100.0; } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (randomNumber.nextDouble() < samplingPercentage) { context.write(NullWritable.get(), value); } } }
Driver Code
Finally we will use the driver class to test everything is working fine as expected . The output will contain sample data of employees which can be used for further analysis.
import java.io.File; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.hadoop.design.summarization.blog.ConfigurationFactory; public class SamplingDriver { public static void main(String[] args) throws Exception { /* * I have used my local path in windows change the path as per your * local machine */ args = new String[] { "Replace this string with Input Path location", "Replace this string with output Path location" }; /* delete the output directory before running the job */ FileUtils.deleteDirectory(new File(args[1])); /* set the hadoop system parameter */ System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location"); if (args.length != 2) { System.err.println("Please specify the input and output path"); System.exit(-1); } Configuration conf = ConfigurationFactory.getInstance(); conf.set("sampling_percentage","20"); Job job = Job.getInstance(conf); job.setJarByClass(SamplingDriver.class); job.setJobName("Sampling_Employees"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(EmployeeSampling.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }