spark custom comparator example for sortbykey in java and scala – tutorial 8

Sometimes we want a different sort order entirely, and to support this we can provide our own comparison function.

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");
JavaPairRDD<Movie, Integer> pairRddCustomSort = rdd.mapToPair(new PairFunction<String, Movie,Integer>() {

@Override
public Tuple2<Movie, Integer> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<Movie,Integer>(new Movie(data[0], Double.parseDouble(data[1]), data[2]),1);
}

});

JavaPairRDD<Movie, Integer> pairRddCustomSorted =pairRddCustomSort.sortByKey(new MovieComparator());

System.out.println("------------Sorted---------------------------");

for (Tuple2<Movie, Integer> string : pairRddCustomSorted.collect()) {

System.out.println(string._1.getName()+" "+string._1.getRating());

}

The movie class


import java.io.Serializable;

public class Movie implements Serializable {

private String name;
private Double rating;
private String timestamp;

public Movie(String name, Double rating, String timestamp) {
super();
this.name = name;
this.rating = rating;
this.timestamp = timestamp;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Double getRating() {
return rating;
}

public void setRating(Double rating) {
this.rating = rating;
}

public String getTimestamp() {
return timestamp;
}

public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}

}

The movie comparator class


public class MovieComparator implements Comparator<Movie>,Serializable {

@Override
public int compare(Movie movie1, Movie movie2) {
// TODO Auto-generated method stub
return movie2.getRating().intValue() - movie1.getRating().intValue();
}

}

In Scala

We are using Implicit definitions here to implement the ordered trait


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

val rddPair3 = rdd.map { x =>

var data = x.split(",")
new Tuple2(new Movie(data(0), data(1).toDouble,data(2)),1)

}

implicit val sortMovieByRating = new Ordering[Movie] {
override def compare(a: Movie, b: Movie) = b.rating.toInt-a.rating.toInt
}

rddPair3.sortByKey().collect().foreach(X=>{

println(X._1.name+" "+X._1.rating)

})



class Movie(var name:String,var rating:Double,var time:String) extends Serializable{

}

We can also do secondary sorting in spark as with mapreduce . We need to define a composite key when we convert the dataset into key value pair. For example if we want to sort on movie name and movie rating we need to define a key which will have both these parameters. Then we need to follow below steps

1. Then we need to group the data by movie name to make sure all the rating of a specific movie land in the same partition during the reduce phase. But our key is a composite key with 2 fields. Simply paritioning by key won’t work for us. So we need create a custom partitioner that knows which value to use in determining the partition the data will flow to.

2.We also need to tell Spark how we want our data sorted like movie name first and then rating.

So we need to come up with a custom partitioner and a custom comparator to do the above.And in the driver code we need to call the rddpair.repartitionAndSortWithinPartitions(Custom_partioner_implementation)

Leave a Reply

Your email address will not be published. Required fields are marked *