mapreduce example to filter data

Filtering serves as an abstract pattern for some of the other patterns. Filtering simply evaluates each record separately and decides, based on some condition, whether it should stay or go.

Intent

Filter out records that are not of interest and keep ones that are. Consider an evaluation function f that takes a record and returns a Boolean value of true or false. If this function returns true, keep the record; otherwise, toss it out.

Use cases for this pattern
  • Closer view of data – Prepare a particular subset of data, where the records have something in common or something of interest, for more examination.
  • Tracking a thread of events – Extract a thread of consecutive events as a case study from a larger data set.
  • Distributed grep – Grep, a very powerful tool that uses regular expressions for finding lines of text of interest, is easily parallelized
  • Data cleansing – Data sometimes is dirty, whether it be malformed, incomplete, or in the wrong format.
  • Simple random sampling – If you want a simple random sampling of your data set, you can use filtering where the evaluation function randomly returns true or false.
  • Removing low scoring data – If you can score your data with some sort of scalar value, you can filter out records that don’t meet a certain threshold.

This pattern is basically as efficient as MapReduce can get because the job is map-only.There are a couple of reasons why map-only jobs are efficient. 1.  No reducers are needed, data never has to be transmitted between the map and reduce phase. Most of the map tasks pull data off of their locally attached disks and then write back out to that node. 2. Both the sort phase and the reduce phase are cut out.This usually doesn’t take very long, but every little bit helps.

One thing to be aware of is the size and number of the output files. Since this job is running with mappers only, you will get one output file per mapper with the prefix part-m- (note the m instead of the r). You may find that these files will be tiny if you filter out a lot of data, which can cause problems with scalability limitations of the NameNode further down the road. If you are worried about the number of small files and do not mind if your job runs just a little bit longer, you can use an identity reducer to collect the results without doing anything with them. This will have the mapper send the reducer all of the data, but the reducer does nothing other than just output them to one file per reducer. The appropriate number of reducers depends on the amount of data that will be written to the file system and just how many small files you want to deal with.

Problem to Solve

1. Given a list of employees with there department filter employees who are not part of POLICE department.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Mapper Code

In the mapper class we are splitting the input data using comma as a delimiter .Department information is stored in the 3rd index so we are fetching the department name and comparing that against the regex to ignore data which is not part of police department.We use the setup function to retrieve the map regex from the job configuration.

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DepartmentFilterMapper extends Mapper<Object, Text, NullWritable, Text> {

private String regexPattern = null;

public void setup(Context context) throws IOException, InterruptedException {
regexPattern = context.getConfiguration().get("departmentRegex");
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);
String departmentName = field[3];
if (departmentName.matches(regexPattern)) {
context.write(NullWritable.get(), value);
}

}

}

Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output will contain the data of employees who are part of police department.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class FilterDriver {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
conf.set("departmentRegex","POLICE");
Job job = Job.getInstance(conf);
job.setJarByClass(FilterDriver.class);
job.setJobName("Find_Filter_Department");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(DepartmentFilterMapper.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}