spark sortby and sortbykey example in java and scala – tutorial 7

We can sort an RDD with key/value pairs provided that there is an ordering defined on the key. Once we have sorted our data, any subsequent call on the sorted data to collect() or save() will result in ordered data.

Using Sort by on normal RDD

We will use the below dataset


Spider Man,4,978302174
Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000

Lets sort the data based on ratings

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

/*The second parameter is true the sorting will be in ascending order or it will be in descending order. The third parameter is number of partitions.*/

JavaRDD<String> sortedR=rdd.sortBy(new Function<String,String>() {

@Override
public String call(String arg0) throws Exception {
// TODO Auto-generated method stub
return arg0.split(",")[1];
}
}, false, 10);

for (String string :sortedR.collect()) {

System.out.println(string);

}

In Scala


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

rdd.sortBy(f=>{

f.split(",")(1)

}, false, 10).collect().foreach { x =>{

println("--"+x);

} }
<h6>
sortByKey function

Lets find the average rating of each movie and then sort the data based on the average rating

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

JavaPairRDD<String, AverageRating> pairRdd = rdd.mapToPair(new PairFunction<String, String, AverageRating>() {

@Override
public Tuple2<String, AverageRating> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, AverageRating>(data[0], new AverageRating(1, Double.parseDouble(data[1])));
}

});

JavaPairRDD<String, AverageRating> reduce = pairRdd
.reduceByKey(new Function2<AverageRating, AverageRating, AverageRating>() {

@Override
public AverageRating call(AverageRating arg0, AverageRating arg1) throws Exception {

arg0.setSum(arg0.getSum() + arg1.getSum());
arg0.setCount(arg0.getCount() + arg1.getCount());

return arg0;
}
});

JavaPairRDD<Double, String> rating = reduce
.mapToPair(new PairFunction<Tuple2<String, AverageRating>, Double, String>() {

@Override
public Tuple2<Double, String> call(Tuple2<String, AverageRating> arg0) throws Exception {

return new Tuple2(arg0._2.getSum() / arg0._2.getCount(), arg0._1);
}
});

JavaPairRDD<Double, String> sorted = rating.sortByKey(false);

for (Tuple2<Double, String> tuple : sorted.collect()) {

System.out.println(tuple._2 + " " + tuple._1);

}

In Scala


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")
val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data")

val rddPair1 = rdd.map { x =>

var data = x.split(",")

new Tuple2(data(0), new AverageRating(1, data(1).toDouble))

}

rddPair1.reduceByKey((x, y) => {

x.sum += y.sum
x.count += y.count
x

}).map(f => {

new Tuple2(f._2.sum / f._2.count, f._1)

}).sortByKey(false, 5).collect().foreach(f => {

println(f._2 + " " + f._1)

})

2 thoughts on “spark sortby and sortbykey example in java and scala – tutorial 7”

  1. Can you include tutorials in Python as well. As Python is in much use recently. And I am also looking for the same.

Leave a Reply

Your email address will not be published. Required fields are marked *