Spark pair rdd reduceByKey, foldByKey and flatMap aggregation function example in scala and java – tutorial 3

When datasets are described in terms of key/value pairs, it is common to want to aggregate statistics across all elements with the same key. The fold(), combine(), and reduce() actions available on basic RDDs are present on pair RDDs. Spark has a similar set of operations that combines values that have the same key.

reduceByKey

reduceByKey() is quite similar to reduce() both take a function and use it to combine values. reduceByKey() runs several parallel reduce operations, one for each key in the dataset, where each operation combines values that have the same key. Because datasets can have very large numbers of keys, reduceByKey() is not implemented as an action that returns a value to the user program. Instead, it returns a new RDD consisting of each key and the reduced value for that key.

Lets take an example of processing a rating csv file which has movie_id,rating,timestamp columns and we need to find average rating of each movie

In Scala

We will first load the rating data into the rdd


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\rating_data")

We convert the normal rdd into pair rdd using the map method where we are returning a object of Tuple2 type with movie id as key and average custom object as value.


val pairRdd = rdd.map { x =>

var str = x.split(",")

new Tuple2(str(0), new Average(1, str(1).toDouble, str(1).toDouble))

}

We are calling the reducebykey fuction on the rddpair passing the average object iteratively. In the first iteration first two average object that was added to the pairRdd will be passed and in the next iteration the first parameter will contain the previous iteration aggregated average object and the second parameter will be the 3 rd row of rdd and this will continue until all the tuple2 object in the rdd are iterated over. Finally the val d will have the aggregated value with first parameter as the key and second parameter as the aggregated average object which can be accessed using _1 and _2.


val d = pairRdd.reduceByKey((x: Average, y: Average) => {

x.count += y.count
x.sum += y.value
x

})

d.foreach(x => {

println(x._1 + " " + x._2.average())

})

Average object code


class Average(var count: Int, var sum: Double,var value:Double) extends Serializable {

def average(): Double = {

sum / count

}

}

In Java

We will first load the rating data into the rdd


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\rating_data");

We convert the normal rdd into pair rdd using the mapToPair method where we are returning a object of Tuple2 type with movie id as key and average custom object as value.


JavaPairRDD<String, Average> pairRdd = rdd.mapToPair(new PairFunction<String, String, Average>() {

@Override
public Tuple2<String, Average> call(String x) throws Exception {
// TODO Auto-generated method
String[] data = x.split(",");
return new Tuple2<String, Average>(data[0],
new Average(1, Double.parseDouble(data[1]), Double.parseDouble(data[1])));
}

});

We are calling the reducebykey fuction passing an implementation of new Function2<T, T, R>() type on the JavaPairRDD object.The reduceByKey function will be passing the average object iteratively in the pairRdd. In the first iteration first two average object that was added to the pairRdd will be passed and in the next iteration the first parameter will contain the previous iteration aggregated average object and the second parameter will be the 3 rd row of rdd and this will continue until all the tuple2 object in the rdd are iterated over. Finally the val reduce will have the aggregated value with first parameter as the key and second parameter as the aggregated average object which can be accessed using _1 and _2.


JavaPairRDD<String, Average> reduce=pairRdd.reduceByKey(new Function2<Average, Average, Average>() {

@Override
public Average call(Average one, Average two) throws Exception {

one.setSum(one.getSum() + two.getValue());
one.setCount(one.getCount() + two.getCount());

return one;
}
});

for (Tuple2<String,Average> avg : reduce.collect()) {

System.out.println(avg._1+" "+avg._2.average());

}

output


4 4.190476190476191
5 3.1464646464646466
6 3.9014084507042255
2 3.7131782945736433
3 3.9019607843137254
1 4.188679245283019

FoldByKey

foldByKey() is quite similar to fold() both use a zero value of the same type of the data in our RDD and combination function. As with fold(), the provided zero value for foldByKey() should have no impact when added with your combination function to another element.

In Scala


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\rating_data")

val pairRdd = rdd.map { x =>

var str = x.split(",")

new Tuple2(str(0), new Average(1, str(1).toDouble, str(1).toDouble))

}

val foldRdd=pairRdd.foldByKey(new Average(0, 0.0, 0.0))((x, y) => {

x.count += y.count
x.sum += y.value
x

})

foldRdd.collect().foreach(x =>{

println("fold "+x._1+" "+x._2.average())

})

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\rating_data");

JavaPairRDD<String, Average> pairRdd = rdd.mapToPair(new PairFunction<String, String, Average>() {

@Override
public Tuple2<String, Average> call(String x) throws Exception {
// TODO Auto-generated method
String[] data = x.split(",");
return new Tuple2<String, Average>(data[0],
new Average(1, Double.parseDouble(data[1]), Double.parseDouble(data[1])));
}

});

JavaPairRDD<String, Average> fold=pairRdd.foldByKey(new Average(0, 0, 0), new Function2<Average, Average, Average>() {

@Override
public Average call(Average one, Average two) {
one.setCount(one.getCount() + two.getCount());
one.setSum(one.getSum() + two.getValue());
return one;

}

});

for (Tuple2<String,Average> string : fold.collect()) {

System.out.println("Fold result - "+string._1+" "+string._2.average());

}

Note : Those familiar with the combiner concept from MapReduce should note that calling reduceByKey() and foldByKey() will automatically perform combining locally on each machine before computing global totals for each key. The user does not need to specify a combiner. The more general combineByKey() interface allows you to customize combining behavior.

flatMap

We will solve a work count problem using flatmap function along with reduceby function

In Scala


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\text_data")

rdd.flatMap { x =>
{

x.split(" ")

}
}.map { x =>
{

new Tuple2(x, 1)

}
}.reduceByKey((x, y) => {

x + y

}).collect().foreach(f => {

println(f._1 + " " + f._2)

})

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\text_data");

JavaRDD<String> flatrdd = rdd.flatMap(new FlatMapFunction<String, String>() {

@Override
public Iterator<String> call(String arg0) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(arg0.split(" ")).iterator();
}
});

List<Tuple2<String,Integer>> list=flatrdd.mapToPair(new PairFunction<String, String, Integer>() {

@Override
public Tuple2<String, Integer> call(String arg0) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(arg0, 1);
}

}).reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
// TODO Auto-generated method stub
return arg0 + arg1;
}

}).collect();

for (Tuple2<String, Integer> tuple2 : list) {

System.out.println(tuple2._1+" "+tuple2._2());

}