We can sort an RDD with key/value pairs provided that there is an ordering defined on the key. Once we have sorted our data, any subsequent call on the sorted data to collect() or save() will result in ordered data.
Using Sort by on normal RDD
We will use the below dataset
Spider Man,4,978302174 Spider Man,4,978301398 Spider Man,4,978302091 Bat Man,5,978298709 Bat Man,4,978299000
Lets sort the data based on ratings
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); /*The second parameter is true the sorting will be in ascending order or it will be in descending order. The third parameter is number of partitions.*/ JavaRDD<String> sortedR=rdd.sortBy(new Function<String,String>() { @Override public String call(String arg0) throws Exception { // TODO Auto-generated method stub return arg0.split(",")[1]; } }, false, 10); for (String string :sortedR.collect()) { System.out.println(string); }
In Scala
import org.apache.spark.SparkContext import org.apache.spark.SparkConf val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") rdd.sortBy(f=>{ f.split(",")(1) }, false, 10).collect().foreach { x =>{ println("--"+x); } }
sortByKey function
Lets find the average rating of each movie and then sort the data based on the average rating
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); JavaPairRDD<String, AverageRating> pairRdd = rdd.mapToPair(new PairFunction<String, String, AverageRating>() { @Override public Tuple2<String, AverageRating> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, AverageRating>(data[0], new AverageRating(1, Double.parseDouble(data[1]))); } }); JavaPairRDD<String, AverageRating> reduce = pairRdd .reduceByKey(new Function2<AverageRating, AverageRating, AverageRating>() { @Override public AverageRating call(AverageRating arg0, AverageRating arg1) throws Exception { arg0.setSum(arg0.getSum() + arg1.getSum()); arg0.setCount(arg0.getCount() + arg1.getCount()); return arg0; } }); JavaPairRDD<Double, String> rating = reduce .mapToPair(new PairFunction<Tuple2<String, AverageRating>, Double, String>() { @Override public Tuple2<Double, String> call(Tuple2<String, AverageRating> arg0) throws Exception { return new Tuple2(arg0._2.getSum() / arg0._2.getCount(), arg0._1); } }); JavaPairRDD<Double, String> sorted = rating.sortByKey(false); for (Tuple2<Double, String> tuple : sorted.collect()) { System.out.println(tuple._2 + " " + tuple._1); }
In Scala
import org.apache.spark.SparkContext import org.apache.spark.SparkConf val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data") val rddPair1 = rdd.map { x => var data = x.split(",") new Tuple2(data(0), new AverageRating(1, data(1).toDouble)) } rddPair1.reduceByKey((x, y) => { x.sum += y.sum x.count += y.count x }).map(f => { new Tuple2(f._2.sum / f._2.count, f._1) }).sortByKey(false, 5).collect().foreach(f => { println(f._2 + " " + f._1) })
Can you include tutorials in Python as well. As Python is in much use recently. And I am also looking for the same.
hi chandan i would surely consider adding code in python in future