mapreduce example to find the distinct set of data

This Pattern exploits MapReduce’s ability to group keys together to remove duplicates. This pattern uses a mapper to transform the data and doesn’t do much in the reducer. The combiner can always be utilized in this pattern and can help considerably if there are a large number of duplicates. Duplicate records are often located close to another in a data set, so a combiner will deduplicate them in the map phase.The mapper takes each record and extracts the data fields for which we want unique values.The mapper outputs the record as the key, and null as the value.The reducer groups the nulls together by key, so we’ll have one null per key. We then simply output the key, since we don’t care how many nulls we have. Because each key is grouped together, the output data set is guaranteed to be unique.One nice feature of this pattern is that the number of reducers doesn’t matter in terms of the calculation itself. Set the number of reducers relatively high, since the mappers will forward almost all their data to the reducers.

Note: This is a good time to resize your data file sizes. If you want your output files to be larger, reduce the number of reducers. If you want them smaller, increase the number of reducers. The files will come out to be about the same size thanks to the random hashing in the partitioner.

Use Cases this pattern solves
  1. Deduplicate data – IF the data from the source has duplicate records it can be removed.
  2. Getting distinct values – This is useful when your raw records may not be duplicates, but the extracted informationis duplicated across records. For example lets say you want to run some analysis based on the number of unique users to a website in that case we need to filter out data from the same user.
  3. Protecting from an inner join explosion – If you are about to do an inner join between two data sets and your foreign keys are not unique, you risk retrieving a huge number of records. For example, if you have 1000 of the same key in one data set, and 2,000 of the same key in the other data set, you’ll end up with 2,000,000 records, all sent to one reducer! By running the distinct pattern, you can pair down your values to make sure they are unique and mitigate against this problem.
Pig code to achieve distinct

distinct_records = DISTINCT data;

dump distinct_records ;

Performance analysis

The main consideration in determining how to set up the MapReduce job is the number of reducers you think you will need. The number of reducers is highly dependent on the total number of records and bytes coming out of the mappers, which is dependent on how much data the combiner is able to eliminate. Basically, if duplicates are very rare within an input split (and thus the combiner did almost nothing), pretty much all of the data is going to be sent to the reduce phase.You can find the number of output bytes and records by looking at the JobTracker status of the job on a sample run. Take the number of output bytes and divide by the number of reducers you are thinking about using. That is about how many bytes each reducer will get, not accounting for skew. The number that a reducer can handle varies from deployment to deployment, but usually you shouldn’t pass it more than a few hundred megabytes. You also don’t want to pass too few records, because then your output files will be tiny and there will be unnecessary overhead in spinning up the reducers. Aim for each reducer to receive more than the block size of records.

Note : Be conscious of how many reduce slots your cluster has when selecting the number of reducers of your job. A good start for the distinct pattern would be close to the number of reduce slots for reasonably sized data sets or twice the number of reduce slots for very large data sets.

 

Problem to Solve :

1. Lets solve a problem from the telecom domain . The data we receive from the EMS has some duplicate data which we want to filter out before ingesting the data into lets say hive table. We will be determining the uniqueness of the record based on the device_id,port_id and the timestamp these records are in the index number 0,1 and 3 respectively.

Here is a sample input data attached ems_data.csv

Mapper Code

The mapper will get the device_id,port_id and the timestamp from the record which are in the index number 0,1 and 3 respectively.We are using a stringbuilder and we are appending the three fields into the stringbuilder to form the key. And finally we are outputting the key and value as the record as is without modifying it.

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DistinctRecordMapper extends Mapper<Object, Text, Text, Text> {

private Text keyValue = new Text();
private StringBuilder keyFormat=new StringBuilder();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

keyFormat.setLength(0);
String data = value.toString();
String[] field = data.split(",", -1);
if (null != field && field.length > 10) {
keyFormat.append(field[0]+field[1]+field[3]);
keyValue.set(keyFormat.toString());
context.write(keyValue, value);
}

}

}
Reducer Code

The maximum work for building a distinct set of values is handled by the MapReduce framework. Each reducer is given a unique key and a set of values. We are iterating over the values and outputting the value only once and we break out the loop so we can get distinct set of records.

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctRecordReducer extends Reducer<Text, Text, NullWritable, Text> {

@Override
public void reduce(Text text, Iterable<Text> values, Context context) throws IOException,
InterruptedException {

for (Text value : values) {
context.write(NullWritable.get(),value);
break;
}
}

}
Combiner optimization

A combiner can and should be used in the distinct pattern. Duplicate keys will be removed from each local map’s output, thus reducing the amount of network I/O required. The same code for the reducer can be used in the combiner.

Driver Code

Finally we will use the driver class to test everything is working fine as expected . The output will contain distinct set of records.

import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class DriverDistinctRecord {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = ConfigurationFactory.getInstance();
Job job = Job.getInstance(conf);
job.setJarByClass(DriverDistinctRecord.class);
job.setJobName("Distinct_Record");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(DistinctRecordMapper.class);
job.setReducerClass(DistinctRecordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}