mapreduce example to partition data using custom partitioner

The partitioning pattern moves the records into categories i,e shards, partitions, or bins but it doesn’t really care about the order of records.The intent is to take similar records in a data set and partition them into distinct, smaller data sets.Partitioning means breaking a large set of data into smaller subsets, which can be chosen by some criterion relevant to your analysis. To improve performance, you can run a job that takes the data set and breaks the partitions out into separate files. Then, when a particular subset for the data is to be analyzed, the job needs only to look at that data.

Partitioning can also help out when you have several different types of records in the same data set, which is increasingly common in NoSQL. For example, in a HTTP server logs, you’ll have GET and POST requests, internal system messages, and error messages.Analysis may care about only one category of this data, so partitioning it into these categories will help narrow down the data the job runs over before it even runs.The one major requirement to apply this pattern is knowing how many partitions you are going to have ahead of time. For example, if you know you are going to partition by day of the week, you know that you will have seven partitions.You can get around this requirement by running an analytic that determines the number of partitions. For example, if you have a bunch of timestamped data, but you don’t know how far back it spans, run a job that figures out the date range for you.

Structure

All you have to do for this pattern is define the function that determines what partition a record is going to go to in a custom partitioner.The custom partitioner is the meat of this pattern. The custom partitioner will determine which reducer to send each record to each reducer corresponds to particular partitions.The output folder of the job will have one part file for each partition.

Note : Since each category will be written out to one large file, this is a great place to store the data in block-compressed SequenceFiles, which are arguably the most efficient and easy-to-use data format in Hadoop.

Use Cases

Partition pruning by continuous value – You have some sort of continuous variable, such as a date or numerical value, and at any one time you care about only a certain subset of that data. Partitioning the data into bins will allow your jobs to load only pertinent data.

  1. Partition pruning by category – If records fit into one of several clearly defined categories, such as country, phone area code, or language.
  2. Sharding – A system in your architecture has divisions of data such as different disks and you need to partition the data into these existing shards.
Performance analysis

The main performance concern with this pattern is that the resulting partitions will likely not have similar number of records. Perhaps one partition turns out to hold 50% of the data of a very large data set. If implemented naively, all of this data will get sent to one reducer and will slow down processing significantly. It’s pretty easy to get around this, though. Split very large partitions into several smaller partitions, even if just randomly. Assign multiple reducers to one partition and then randomly assign records into each to spread it out a bit better.

Problem to solve

Given a list of employees with there department partition the data based on the department so the further analyses can be run separately per department basis.

Here is a sample input data attached ems_data.csv

Driver Code

The job needs to be configured to use the custom built partitioner, and this partitioner needs to be configured. The number of reducers need to be set here and we are setting it to 38 as we have 38 different department so that full range of partitions is accounted for. Be conscious of how many reduce slots your cluster has when selecting the number of reducers of your job. A good start would be close to the number of reduce slots for reasonably sized data sets or twice the number of reduce slots for very large data sets.


import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.hadoop.design.summarization.blog.ConfigurationFactory;

public class DriverCustomPartioner {

public static void main(String[] args) throws Exception {

/*
* I have used my local path in windows change the path as per your
* local machine
*/

args = new String[] { "Replace this string with Input Path location",
"Replace this string with output Path location" };

/* delete the output directory before running the job */

FileUtils.deleteDirectory(new File(args[1]));

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "Replace this string with hadoop home directory location");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setPartitionerClass(DepartmentPartitioner.class);
job.setJarByClass(DriverCustomPartioner.class);
job.setJobName("Partioning_Pattern");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(DepartmentPartitionMapper.class);
job.setReducerClass(PartitionerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(38);
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

Mapper Code

In the mapper class we are splitting the input data using comma as a delimiter and then checking for some invalid data to ignore it in the if condition. Department information is stored in the 3rd index so we are fetching the department and storing it in mapperKey. This is so the partitioner can do the work of putting each department into its appropriate partition.


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DepartmentPartitionMapper extends Mapper<Object, Text, Text, Text> {

public Text mapperKey=new Text();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);

if (null != field && field.length == 9 && field[3].length() > 0) {
String departmentName = field[3];
mapperKey.set(departmentName);
context.write(mapperKey, value);
}

}

}

Partitioner Code

The partitioner examines each key/value pair output by the mapper to determine which partition the key/value pair will be written. Each numbered partition will be copied by its associated reduce task during the reduce phase. The partitioner implements the Configurable interface. In the partitioner getpartition method we are taking the hashcode of the key and dividing it by the number of partitions and finally taking the absolute value to make sure we get a positive number as negative partition number would result in invalid partition exception.


import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class DepartmentPartitioner extends Partitioner<Text, Text>implements Configurable {

private Configuration conf = null;

@Override
public void setConf(Configuration conf) {
this.conf = conf;

}

@Override
public Configuration getConf() {
return conf;
}

@Override
public int getPartition(Text key, Text value, int numPartitions) {

return Math.abs((key.toString().hashCode()) % numPartitions);
}

}

Reducer code

The reducer code is very simple since we simply want to output the values. The work of partitioning has been done at this point.


import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PartitionerReducer extends Reducer<Text, Text, Text, NullWritable> {

@Override
public void reduce(Text text, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for (Text value : values) {
context.write(value, NullWritable.get());
}
}

}