groupBy function works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key.
Below is the sample data we are using
Spider Man,4,978301398 Spider Man,4,978302091 Bat Man,5,978298709 Bat Man,4,978299000
In Scala
In the below example the 0th index is the movie name so we will be using the movie name as the key to group the dataset. The groupBy function return a RDD[(K, Iterable[String])] where K is the key and the a iterable list of values associated with the key . After group by we are the passing the rdd into a collect method which returns a Array[(String, Iterable[String])] . The key can be accessed using the _1 and the iterable by _2. We are calculating the average rating for a movie here by iterating over the values for a given key and averaging the sum by the number of reviews.
val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") val groupRdd = rdd.groupBy { x => x.split(",")(0) } groupRdd.collect().foreach(f => { val it = f._2.iterator var sum = 0.0d; var count = 0; while (it.hasNext) { sum += it.next().split(",")(1).toDouble count += 1 } println(f._1 + " " + sum / count) })
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); JavaPairRDD<String, Iterable<String>> group = rdd.groupBy(new Function<String, String>() { @Override public String call(String arg0) throws Exception { String[] data = arg0.split(","); return data[0]; } }); for (Tuple2<String, Iterable<String>> string : group.collect()) { Iterable<String> it = string._2; Iterator<String> iterator=it.iterator(); int count =0; double sum=0; while (iterator.hasNext()) { sum+=Double.parseDouble(iterator.next().split(",")[1]); count++; } System.out.println("Average rating "+string._1+" - "+sum/count); }
groupByKey
If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].
Below is the sample data we are using and lets calculate the average rating for a movie here by iterating over the values for a given key and averaging the sum by the number of reviews.
Spider Man,4,978301398 Spider Man,4,978302091 Bat Man,5,978298709 Bat Man,4,978299000
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() { @Override public Tuple2<String, Double> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1])); } }); JavaPairRDD<String, Iterable<Double>> groupRdd = pairRdd.groupByKey(); for (Tuple2<String, Iterable<Double>> tuple : groupRdd.collect()) { Iterator<Double> it = tuple._2.iterator(); double sum = 0.0; int count = 0; while (it.hasNext()) { sum += it.next(); count++; } System.out.println(tuple._1 + " - " + sum / count); }
In Scala
val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") val pairRdd = rdd.map { x => var str = x.split(",") new Tuple2(str(0), str(1)) } pairRdd.groupByKey().collect().foreach(x => { var ite = x._2.iterator var sum = 0d var count = 0 while (ite.hasNext) { sum += ite.next().toDouble count += 1 } println("Group By Key " + x._1 + " " + sum / count) })
We can also pass in a implementation of a custom partitioner into the groupbykey if required below is an example in java
JavaPairRDD<String, Iterable<Double>> groupPartitionRdd = pairRdd.groupByKey(new Partitioner() { @Override public int getPartition(Object arg0) { // TODO Auto-generated method stub return Math.abs(arg0.toString().hashCode()) % numPartitions(); } @Override public int numPartitions() { // TODO Auto-generated method stub return 7; } });
cogroup and groupWith
we can group data sharing the same key from multiple RDDs using a function called cogroup() and groupWith().cogroup() over two RDDs sharing the same key type, K, with the respective value types V and W gives us back RDD[(K, (Iterable[V], Iterable[W]))]. If one of the RDDs doesn’t have elements for a given key that is present in the other RDD, the corresponding Iterable is simply empty. cogroup() gives us the power to group data from multiple RDDs.
groupWith() is functionally equivalent to the cogroup and its just an alias to the cogroup so we can use either of these function.
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); JavaRDD<String> rdd2 = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data"); JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() { @Override public Tuple2<String, Double> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1])); } }); JavaPairRDD<String, String> pairRdd2 = rdd2.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, String>(data[1], data[2]); } }); JavaPairRDD<String, Tuple2<Iterable<Double>, Iterable<String>>> groupPartitionRdd2 = pairRdd .groupWith(pairRdd2); JavaPairRDD<String, Tuple2<Iterable<Double>, Iterable<String>>> cogroupPartition = pairRdd .cogroup(pairRdd2); for (Tuple2<String, Tuple2<Iterable<Double>, Iterable<String>>> string : cogroupPartition.collect()) { String key = string._1; String genre=null; Tuple2<Iterable<Double>, Iterable<String>> value = string._2; Iterator<Double> first = value._1.iterator(); Iterator<String> second = value._2.iterator(); while(second.hasNext()) { genre=second.next(); } while (first.hasNext()) { System.out.println(key + " - " + first.next() + "," + genre); } }
In Scala
val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data") val pairRdd = rdd.map { x => var str = x.split(",") new Tuple2(str(0), str(1)) } val pairRdd2 = rdd2.map { x => var str = x.split(",") new Tuple2(str(1), str(2)) } pairRdd.cogroup(pairRdd2).foreach(f => { var it2 = f._2._1.iterator var it3 = f._2._2.iterator var sum = 0.0d var count = 0 var genre = "" while (it3.hasNext) { genre = it3.next() } while (it2.hasNext) { println(f._1 + " " + it2.next() + " " + genre) } }) pairRdd.groupWith(pairRdd2).foreach(f => { var it2 = f._2._1.iterator var it3 = f._2._2.iterator var sum = 0.0d var count = 0 var genre = "" while (it3.hasNext) { genre = it3.next() } while (it2.hasNext) { println(f._1 + " " + it2.next() + " " + genre) } })
Additionally, cogroup() and groupWith() can work upto four RDDs at once and below is an example in java
JavaPairRDD<String, Tuple4<Iterable<Double>, Iterable<String>, Iterable<Double>, Iterable<String>>> groupPartitionRddMulti = pairRdd.groupWith(pairRdd2,pairRdd,pairRdd2); JavaPairRDD<String, Tuple4<Iterable<Double>, Iterable<String>, Iterable<Double>, Iterable<String>>> cogroupPartitionMulti = pairRdd.cogroup(pairRdd2,pairRdd,pairRdd2);