spark secondary sorting example using rdd and dataframe

We can do a secondary sorting in spark as with map reduce .We need to define a composite key when we convert the dataset into key value pair. For example if we want to sort on movie name and movie rating we need to define a key which will have both these parameters. Then we need to follow below steps

1. Then we need to group the data by movie name to make sure all the rating of a specific movie land in the same partition during the reduce phase. But our key is a composite key with 2 fields. Simply paritioning by key won’t work for us. So we need create a custom partitioner that knows which value to use in determining the partition the data will flow to.

2.We also need to tell Spark how we want our data sorted like movie name first and then rating.

So we need to come up with a custom partitioner and a custom comparator to do the above.And in the driver code we need to call the rddpair.repartitionAndSortWithinPartitions(Custom_partioner_implementation)

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate


dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Lets group the data based on the department and then we will sort the data first on the salary and then on the first name.

Lets create key and value class for employee data. The Employee_Key class should implement the Comparable<Employee_Key> interface so the framework knows how to sort our custom object.


import java.io.Serializable;

public class Employee_Key implements Comparable<Employee_Key>, Serializable {

private static final long serialVersionUID = 3232323;
private String name;
private String department;
private double salary;

public Employee_Key(String name, String department, double salary) {
super();
this.name = name;
this.department = department;
this.salary = salary;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDepartment() {
return department;
}

public void setDepartment(String department) {
this.department = department;
}

public double getSalary() {
return salary;
}

public void setSalary(double salary) {
this.salary = salary;
}

@Override
public int compareTo(Employee_Key emp1) {

int compare = (int) emp1.getSalary() - (int) this.getSalary();

if (compare == 0) {
compare = this.getName().compareTo(emp1.getName());
}

return compare;

}

}


import java.io.Serializable;

public class Employee_Value implements Serializable{

private static final long serialVersionUID = 4343434;
private String designation;
private String lastname;
private String jobtype;

public Employee_Value(String designation, String lastname, String jobtype) {
super();
this.designation = designation;
this.lastname = lastname;
this.jobtype = jobtype;
}

public String getDesignation() {
return designation;
}

public void setDesignation(String designation) {
this.designation = designation;
}

public String getLastname() {
return lastname;
}

public void setLastname(String lastname) {
this.lastname = lastname;
}

public String getJobtype() {
return jobtype;
}

public void setJobtype(String jobtype) {
this.jobtype = jobtype;
}

}

Our key is a custom object with 3 values so simply partitioning by key won’t work for us. So we need create a custom partitioner that knows which value to use in our custom object in determining the partition the data will flow to.


public class CustomEmployeePartitioner extends org.apache.spark.Partitioner {

private static final long serialVersionUID = 343434;
private int numPartitions;

public int getNumPartitions() {
return numPartitions;
}

public void setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}

public CustomEmployeePartitioner(int numPartitions) {
super();
this.numPartitions = numPartitions;
}

@Override
public int getPartition(Object arg0) {

Employee_Key emp = (Employee_Key) arg0;

return Math.abs(emp.getDepartment().hashCode() % getNumPartitions());
}

@Override
public int numPartitions() {
// TODO Auto-generated method stub
return getNumPartitions();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof CustomEmployeePartitioner) {
CustomEmployeePartitioner partitionerObject = (CustomEmployeePartitioner) obj;
if (partitionerObject.getNumPartitions() == this.getNumPartitions())
return true;
}

return false;
}

}

Lets code the driver class . We are using the method repartitionAndSortWithinPartitions passing our custom partitioner which partitions and also sorts the data based on our comparable implementation we have implemented for our key.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class Driver {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("pattern").setMaster("local");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\inputdata\\employee");

JavaPairRDD<Employee_Key, Employee_Value> pair = rdd
.mapToPair(new PairFunction<String, Employee_Key, Employee_Value>() {

@Override
public Tuple2<Employee_Key, Employee_Value> call(String value) throws Exception {
// TODO Auto-generated method stub

String data = value.toString();
String[] field = data.split(",", -1);

return new Tuple2<Employee_Key, Employee_Value>(
new Employee_Key(field[0], field[3],
field[7].length() > 0 ? Double.parseDouble(field[7]) : 0.0),
new Employee_Value(field[2], field[1], field[4]));

}
});

JavaPairRDD<Employee_Key, Employee_Value> output = pair
.repartitionAndSortWithinPartitions(new CustomEmployeePartitioner(50));

for (Tuple2<Employee_Key, Employee_Value> data : output.collect()) {

System.out.println(data._1.getDepartment() + " " + data._1.getSalary() + " " + data._1.getName()
+ data._2.getDesignation() + " " + data._2.getJobtype() + " " + data._2.getLastname());

}

}

}

We can also achieve the same using the dataframe api which is much simpler and below is the example of the same


import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DataSetApi {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Dataset<Row> dataframe = session.read().option("inferSchema", "true")
.csv("input location")
.toDF("fname", "lname", "designation", "department", "jobtype", "NA", "NA2", "salary", "NA3");

dataframe.sort(col("department"), col("salary"), col("fname")).show(300);
}
}