secondary sorting in mapreduce with custom writable as key

We can implement secondary sorting in mapreduce using the below steps

1. Make the key a composite of the natural key and the natural value.
2. The sort comparator should order by the composite key.
3. The partitioner and grouping comparator for the composite key should consider
only the natural key for partitioning and grouping.

Problem to Solve :

1. Given a list of employees sort the data by salary and then first name.

Input Data sample


First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Lets code the example

Mapper Code

We are using the employee custom object as the key in our mapper so we need to implement WritableComparable interface to use it as a key in our mapper.


import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<Object, Text, Employee, NullWritable> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String data = value.toString();
String[] field = data.split(",", -1);

context.write(new Employee(field[0], field[1], field[2], field[3], field[4], field[5], field[6],
field[7].length() > 0 ? Double.parseDouble(field[7]) : 0.0), NullWritable.get());

}

}

Employee mapper key code

import java.io.*;
import org.apache.hadoop.io.*;

public class Employee implements WritableComparable<Employee> {

private Text fname;
private Text lname;
private Text designation;
private Text department;
private Text jobtype;
private Text salaryOrHourly;
private Text typicalHours;
private Double salary;

public Employee() {
set(new Text(), new Text(), new Text(), new Text(), new Text(), new Text(), new Text(), new Double(0.0));
}

public Employee(String fname, String lname, String designation, String department, String jobtype,
String salaryOrHourly, String typicalHours, Double d) {
set(new Text(fname), new Text(lname), new Text(designation), new Text(department), new Text(jobtype),
new Text(salaryOrHourly), new Text(typicalHours), d);
}
public Text getFname() {
return fname;
}

public Text getLname() {
return lname;
}

public Text getDesignation() {
return designation;
}

public Text getDepartment() {
return department;
}

public Text getJobtype() {
return jobtype;
}

public Text getSalaryOrHourly() {
return salaryOrHourly;
}

public Text getTypicalHours() {
return typicalHours;
}

public Double getSalary() {
return salary;
}

public void set(Text fname, Text lname, Text designation, Text department, Text jobtype, Text salaryOrHourly,
Text typicalHours, Double d) {
this.fname = fname;
this.lname = lname;
this.designation = designation;
this.department = department;
this.jobtype = jobtype;
this.salaryOrHourly = salaryOrHourly;
this.typicalHours = typicalHours;
this.salary = d;
}

@Override
public void write(DataOutput out) throws IOException {
fname.write(out);
lname.write(out);
designation.write(out);
department.write(out);
jobtype.write(out);
salaryOrHourly.write(out);
typicalHours.write(out);
out.writeDouble(salary);
}

@Override
public void readFields(DataInput in) throws IOException {
fname.readFields(in);
lname.readFields(in);
designation.readFields(in);
department.readFields(in);
jobtype.readFields(in);
salaryOrHourly.readFields(in);
typicalHours.readFields(in);
salary = in.readDouble();

}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((department == null) ? 0 : department.hashCode());
result = prime * result + ((designation == null) ? 0 : designation.hashCode());
result = prime * result + ((fname == null) ? 0 : fname.hashCode());
result = prime * result + ((jobtype == null) ? 0 : jobtype.hashCode());
result = prime * result + ((lname == null) ? 0 : lname.hashCode());
result = prime * result + ((salary == null) ? 0 : salary.hashCode());
result = prime * result + ((salaryOrHourly == null) ? 0 : salaryOrHourly.hashCode());
result = prime * result + ((typicalHours == null) ? 0 : typicalHours.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Employee other = (Employee) obj;
if (department == null) {
if (other.department != null)
return false;
} else if (!department.equals(other.department))
return false;
if (designation == null) {
if (other.designation != null)
return false;
} else if (!designation.equals(other.designation))
return false;
if (fname == null) {
if (other.fname != null)
return false;
} else if (!fname.equals(other.fname))
return false;
if (jobtype == null) {
if (other.jobtype != null)
return false;
} else if (!jobtype.equals(other.jobtype))
return false;
if (lname == null) {
if (other.lname != null)
return false;
} else if (!lname.equals(other.lname))
return false;
if (salary == null) {
if (other.salary != null)
return false;
} else if (!salary.equals(other.salary))
return false;
if (salaryOrHourly == null) {
if (other.salaryOrHourly != null)
return false;
} else if (!salaryOrHourly.equals(other.salaryOrHourly))
return false;
if (typicalHours == null) {
if (other.typicalHours != null)
return false;
} else if (!typicalHours.equals(other.typicalHours))
return false;
return true;
}

@Override
public String toString() {
return "Employee [fname=" + fname + ", lname=" + lname + ", designation=" + designation + ", department="
+ department + ", jobtype=" + jobtype + ", salaryOrHourly=" + salaryOrHourly + ", typicalHours="
+ typicalHours + ", salary=" + salary + "]";
}

@Override
public int compareTo(Employee tp) {

int cmp = fname.compareTo(tp.fname);
if (cmp != 0) {
return cmp;
}
cmp = lname.compareTo(tp.lname);
if (cmp != 0) {
return cmp;
}
cmp = designation.compareTo(tp.designation);
if (cmp != 0) {
return cmp;
}
cmp = department.compareTo(tp.department);
if (cmp != 0) {
return cmp;
}
cmp = jobtype.compareTo(tp.jobtype);
if (cmp != 0) {
return cmp;
}
cmp = salaryOrHourly.compareTo(tp.salaryOrHourly);
if (cmp != 0) {
return cmp;
}
cmp = typicalHours.compareTo(tp.typicalHours);
if (cmp != 0) {
return cmp;
}
return salary.intValue() - tp.salary.intValue();

}
}

Reducer Code

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends Reducer<Employee, NullWritable, Employee, NullWritable> {

public void reduce(Employee key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {

context.write(key, NullWritable.get());

}

}

Custom Partitioner

We need to implement a custom partitioner to ensure only the natural key is considered when determining which reducer to send the data to.


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class EmployeePartitioner extends Partitioner<Employee, NullWritable> {

@Override
public int getPartition(Employee key, NullWritable value, int numPartitions) {

return Math.abs(key.getDepartment().hashCode()) % numPartitions;
}

}

Custom sort comparator

To sort keys by salary and first name we use a custom sort comparator which extracts the fields and performs the appropriate comparisons.


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class EmployeeComparator extends WritableComparator {

protected EmployeeComparator() {
super(Employee.class, true);
}

@Override
public int compare(WritableComparable one, WritableComparable two) {

Employee emp1 = (Employee) one;
Employee emp2 = (Employee) two;

int compare = emp2.getSalary().intValue() - emp1.getSalary().intValue();

if (compare == 0) {
compare = emp1.getFname().compareTo(emp2.getFname());
}

return compare;

}

}

Grouping Comparator

Once the data reaches a reducer, all data is grouped by key and since we have a composite key, we need to make sure records are grouped solely by the natural key. This is accomplished by writing a custom GroupPartitioner.


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class GroupComparator extends WritableComparator {

protected GroupComparator() {
super(Employee.class, true);
}

@Override
public int compare(WritableComparable one, WritableComparable two) {

Employee emp1 = (Employee) one;
Employee emp2 = (Employee) two;

return emp1.compareTo(emp2);

}

}

Finally lets write the driver class


import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Driver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

FileUtils.deleteDirectory(new File("Output path"));

args = new String[] { "Input location path",
"Output path" };

/* set the hadoop system parameter */

System.setProperty("hadoop.home.dir", "hadoop path");

if (args.length != 2) {
System.err.println("Please specify the input and output path");
System.exit(-1);
}

Configuration conf = new Configuration();
Job sampleJob = Job.getInstance(conf);

TextOutputFormat.setOutputPath(sampleJob, new Path(args[1]));
TextInputFormat.addInputPath(sampleJob, new Path(args[0]));

sampleJob.setJarByClass(Driver.class);
sampleJob.setMapperClass(SortMapper.class);
sampleJob.setPartitionerClass(EmployeePartitioner.class);
sampleJob.setSortComparatorClass(EmployeeComparator.class);
sampleJob.setGroupingComparatorClass(GroupComparator.class);
sampleJob.setReducerClass(SortReducer.class);
sampleJob.setOutputKeyClass(Employee.class);
sampleJob.setOutputValueClass(NullWritable.class);
System.exit(sampleJob.waitForCompletion(true) ? 0 : 1);

}

}

Output


Employee [fname=BRUNO4, lname=KEVIN D, designation=SERGEANT, department=POLICE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=2005.0]
Employee [fname=BRUNO7, lname=KEVIN D, designation=SERGEANT, department=POLICE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=2000.0]
Employee [fname=ALLISON3, lname=PAUL W, designation=LIEUTENANT, department=FIRE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=1004.0]
Employee [fname=ALLISON3, lname=PAUL W, designation=LIEUTENANT, department=FIRE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=900.0]
Employee [fname=BLLISON3, lname=PAUL W, designation=LIEUTENANT, department=WATER, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=900.0]
Employee [fname=CLLISON3, lname=PAUL W, designation=LIEUTENANT, department=FIRE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=900.0]
Employee [fname=VLLISON3, lname=PAUL W, designation=LIEUTENANT, department=WATER, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=900.0]
Employee [fname=ALLISON2, lname=PAUL W, designation=LIEUTENANT, department=FIRE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=700.0]
Employee [fname=BRUNO4, lname=KEVIN D, designation=SERGEANT, department=POLICE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=600.0]
Employee [fname=BRUNO, lname=KEVIN D, designation=SERGEANT, department=POLICE, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=200.0]
Employee [fname=ALLISON, lname=PAUL W, designation=LIEUTENANT, department=WATER, jobtype=F, salaryOrHourly=Salary, typicalHours=, salary=100.0]