mapreduce custom writable example and writablecomparable example

In the below example lets see how to create a custom Writable that can be used as a key in the mapper code we shall write an implementation that represents a employee object.

All Writable implementations must have a default constructor so that the MapReduce framework can instantiate them, then populate their fields by calling readFields().

The write() and writeDouble() method in the below code serializes each Text and Double object to the output stream . Similarly, readFields() and readDouble() deserializes the bytes from the input stream.

We should override the hashCode(), equals(), and toString() methods from java.lang.Object. The hashCode() method is used by the HashPartitioner which is the default partitioner in MapReduce to choose a reduce partition.

As we will be using the Employee object as the key we need to implement WritableComparable interface which has compareTo() method that imposes the ordering.


import java.io.*;
import org.apache.hadoop.io.*;

public class Employee implements WritableComparable<Employee> {

private Text fname;
private Text lname;
private Text designation;
private Text department;
private Text jobtype;
private Text salaryOrHourly;
private Text typicalHours;
private Double salary;

public Employee() {
set(new Text(), new Text(), new Text(), new Text(), new Text(), new Text(), new Text(), new Double(0.0));
}

public Employee(String fname, String lname, String designation, String department, String jobtype,
String salaryOrHourly, String typicalHours, Double d) {
set(new Text(fname), new Text(lname), new Text(designation), new Text(department), new Text(jobtype),
new Text(salaryOrHourly), new Text(typicalHours), d);
}
public Text getFname() {
return fname;
}

public Text getLname() {
return lname;
}

public Text getDesignation() {
return designation;
}

public Text getDepartment() {
return department;
}

public Text getJobtype() {
return jobtype;
}

public Text getSalaryOrHourly() {
return salaryOrHourly;
}

public Text getTypicalHours() {
return typicalHours;
}

public Double getSalary() {
return salary;
}

public void set(Text fname, Text lname, Text designation, Text department, Text jobtype, Text salaryOrHourly,
Text typicalHours, Double d) {
this.fname = fname;
this.lname = lname;
this.designation = designation;
this.department = department;
this.jobtype = jobtype;
this.salaryOrHourly = salaryOrHourly;
this.typicalHours = typicalHours;
this.salary = d;
}

@Override
public void write(DataOutput out) throws IOException {
fname.write(out);
lname.write(out);
designation.write(out);
department.write(out);
jobtype.write(out);
salaryOrHourly.write(out);
typicalHours.write(out);
out.writeDouble(salary);
}

@Override
public void readFields(DataInput in) throws IOException {
fname.readFields(in);
lname.readFields(in);
designation.readFields(in);
department.readFields(in);
jobtype.readFields(in);
salaryOrHourly.readFields(in);
typicalHours.readFields(in);
salary = in.readDouble();

}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((department == null) ? 0 : department.hashCode());
result = prime * result + ((designation == null) ? 0 : designation.hashCode());
result = prime * result + ((fname == null) ? 0 : fname.hashCode());
result = prime * result + ((jobtype == null) ? 0 : jobtype.hashCode());
result = prime * result + ((lname == null) ? 0 : lname.hashCode());
result = prime * result + ((salary == null) ? 0 : salary.hashCode());
result = prime * result + ((salaryOrHourly == null) ? 0 : salaryOrHourly.hashCode());
result = prime * result + ((typicalHours == null) ? 0 : typicalHours.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Employee other = (Employee) obj;
if (department == null) {
if (other.department != null)
return false;
} else if (!department.equals(other.department))
return false;
if (designation == null) {
if (other.designation != null)
return false;
} else if (!designation.equals(other.designation))
return false;
if (fname == null) {
if (other.fname != null)
return false;
} else if (!fname.equals(other.fname))
return false;
if (jobtype == null) {
if (other.jobtype != null)
return false;
} else if (!jobtype.equals(other.jobtype))
return false;
if (lname == null) {
if (other.lname != null)
return false;
} else if (!lname.equals(other.lname))
return false;
if (salary == null) {
if (other.salary != null)
return false;
} else if (!salary.equals(other.salary))
return false;
if (salaryOrHourly == null) {
if (other.salaryOrHourly != null)
return false;
} else if (!salaryOrHourly.equals(other.salaryOrHourly))
return false;
if (typicalHours == null) {
if (other.typicalHours != null)
return false;
} else if (!typicalHours.equals(other.typicalHours))
return false;
return true;
}

@Override
public String toString() {
return "Employee [fname=" + fname + ", lname=" + lname + ", designation=" + designation + ", department="
+ department + ", jobtype=" + jobtype + ", salaryOrHourly=" + salaryOrHourly + ", typicalHours="
+ typicalHours + ", salary=" + salary + "]";
}

@Override
public int compareTo(Employee tp) {

int cmp = fname.compareTo(tp.fname);
if (cmp != 0) {
return cmp;
}
cmp = lname.compareTo(tp.lname);
if (cmp != 0) {
return cmp;
}
cmp = designation.compareTo(tp.designation);
if (cmp != 0) {
return cmp;
}
cmp = department.compareTo(tp.department);
if (cmp != 0) {
return cmp;
}
cmp = jobtype.compareTo(tp.jobtype);
if (cmp != 0) {
return cmp;
}
cmp = salaryOrHourly.compareTo(tp.salaryOrHourly);
if (cmp != 0) {
return cmp;
}
cmp = typicalHours.compareTo(tp.typicalHours);
if (cmp != 0) {
return cmp;
}
return salary.intValue() - tp.salary.intValue();

}
}