spark group by,groupbykey,cogroup and groupwith example in java and scala – tutorial 5

groupBy function works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key.

Below is the sample data we are using


Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000

In Scala

In the below example the 0th index is the movie name so we will be using the movie name as the key to group the dataset. The groupBy function return a RDD[(K, Iterable[String])] where K is the key and the a iterable list of values associated with the key . After group by we are the passing the rdd into a collect method which returns a Array[(String, Iterable[String])] . The key can be accessed using the _1 and the iterable by _2. We are calculating the average rating for a movie here by iterating over the values for a given key and averaging the sum by the number of reviews.


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

val groupRdd = rdd.groupBy { x =>

x.split(",")(0)

}

groupRdd.collect().foreach(f => {

val it = f._2.iterator
var sum = 0.0d;
var count = 0;

while (it.hasNext) {

sum += it.next().split(",")(1).toDouble
count += 1

}

println(f._1 + " " + sum / count)

})

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

JavaPairRDD<String, Iterable<String>> group = rdd.groupBy(new Function<String, String>() {

@Override
public String call(String arg0) throws Exception {

String[] data = arg0.split(",");

return data[0];
}
});

for (Tuple2<String, Iterable<String>> string : group.collect()) {

Iterable<String> it = string._2;

Iterator<String> iterator=it.iterator();

int count =0;

double sum=0;

while (iterator.hasNext()) {

sum+=Double.parseDouble(iterator.next().split(",")[1]);
count++;

}
System.out.println("Average rating "+string._1+" - "+sum/count);

}

groupByKey

If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].

Below is the sample data we are using and lets calculate the average rating for a movie here by iterating over the values for a given key and averaging the sum by the number of reviews.


Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() {

@Override
public Tuple2<String, Double> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1]));
}

});

JavaPairRDD<String, Iterable<Double>> groupRdd = pairRdd.groupByKey();

for (Tuple2<String, Iterable<Double>> tuple : groupRdd.collect()) {

Iterator<Double> it = tuple._2.iterator();

double sum = 0.0;
int count = 0;

while (it.hasNext()) {
sum += it.next();
count++;
}

System.out.println(tuple._1 + " - " + sum / count);

}

In Scala


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")

val pairRdd = rdd.map { x =>

var str = x.split(",")

new Tuple2(str(0), str(1))

}

pairRdd.groupByKey().collect().foreach(x => {

var ite = x._2.iterator

var sum = 0d
var count = 0

while (ite.hasNext) {

sum += ite.next().toDouble
count += 1
}

println("Group By Key " + x._1 + " " + sum / count)

})

We can also pass in a implementation of a custom partitioner into the groupbykey if required below is an example in java


JavaPairRDD<String, Iterable<Double>> groupPartitionRdd = pairRdd.groupByKey(new Partitioner() {

@Override
public int getPartition(Object arg0) {
// TODO Auto-generated method stub

return Math.abs(arg0.toString().hashCode()) % numPartitions();
}

@Override
public int numPartitions() {
// TODO Auto-generated method stub
return 7;
}
});

cogroup and groupWith

we can group data sharing the same key from multiple RDDs using a function called cogroup() and groupWith().cogroup() over two RDDs sharing the same key type, K, with the respective value types V and W gives us back RDD[(K, (Iterable[V], Iterable[W]))]. If one of the RDDs doesn’t have elements for a given key that is present in the other RDD, the corresponding Iterable is simply empty. cogroup() gives us the power to group data from multiple RDDs.

groupWith() is functionally equivalent to the cogroup and its just an alias to the cogroup so we can use either of these function.

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");
JavaRDD<String> rdd2 = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data");

JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() {

@Override
public Tuple2<String, Double> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1]));
}

});

JavaPairRDD<String, String> pairRdd2 = rdd2.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, String>(data[1], data[2]);
}

});

JavaPairRDD<String, Tuple2<Iterable<Double>, Iterable<String>>> groupPartitionRdd2 = pairRdd
.groupWith(pairRdd2);

JavaPairRDD<String, Tuple2<Iterable<Double>, Iterable<String>>> cogroupPartition = pairRdd
.cogroup(pairRdd2);

for (Tuple2<String, Tuple2<Iterable<Double>, Iterable<String>>> string : cogroupPartition.collect()) {

String key = string._1;

String genre=null;

Tuple2<Iterable<Double>, Iterable<String>> value = string._2;

Iterator<Double> first = value._1.iterator();

Iterator<String> second = value._2.iterator();

while(second.hasNext())
{
genre=second.next();
}

while (first.hasNext()) {
System.out.println(key + " - " + first.next() + "," + genre);
}

}

In Scala


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")
val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data")

val pairRdd = rdd.map { x =>

var str = x.split(",")

new Tuple2(str(0), str(1))

}

val pairRdd2 = rdd2.map { x =>

var str = x.split(",")

new Tuple2(str(1), str(2))

}

pairRdd.cogroup(pairRdd2).foreach(f => {

var it2 = f._2._1.iterator
var it3 = f._2._2.iterator
var sum = 0.0d
var count = 0
var genre = ""

while (it3.hasNext) {
genre = it3.next()
}

while (it2.hasNext) {

println(f._1 + " " + it2.next() + " " + genre)

}

})

pairRdd.groupWith(pairRdd2).foreach(f => {

var it2 = f._2._1.iterator
var it3 = f._2._2.iterator
var sum = 0.0d
var count = 0
var genre = ""

while (it3.hasNext) {
genre = it3.next()
}

while (it2.hasNext) {

println(f._1 + " " + it2.next() + " " + genre)

}

})

Additionally, cogroup() and groupWith() can work upto four RDDs at once and below is an example in java


JavaPairRDD<String, Tuple4<Iterable<Double>, Iterable<String>, Iterable<Double>, Iterable<String>>> groupPartitionRddMulti = pairRdd.groupWith(pairRdd2,pairRdd,pairRdd2);

JavaPairRDD<String, Tuple4<Iterable<Double>, Iterable<String>, Iterable<Double>, Iterable<String>>> cogroupPartitionMulti = pairRdd.cogroup(pairRdd2,pairRdd,pairRdd2);