Spark pair rdd and transformations in scala and java – tutorial 2

There are a number of ways to get pair RDDs in Spark and many formats will directly load pair RDDs for their key/value data.If we have regular RDD that we want to turn into a pair RDD. We can do this by running a map() function that returns key/value pairs.

Lets take an example

Lets convert the below data into key value pair . Key will be the first field and the value will the entire line.


NAAACA,/shelf=0/slot=0/port=27,PTP,100000,100,6401884577,0,676,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,101500,200,6096776528,0,663,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,103000,300,5775316908,0,670,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,104500,400,6594627955,0,679,,,,

In Java

Java doesn’t have a built-in tuple type so we need to create tuples using the scala.Tuple2 class. This class is very simple and we can create an instance new Tuple2(elem1, elem2) and can then access its elements with the ._1() and ._2() methods. Java users also need to call special versions of Spark’s functions mapToPair() when creating pair RDDs.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class TestPair {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");

JavaPairRDD<String, String> pairs = rdd.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String input) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, String>(input.split(",")[0], input);
}

});

for (Tuple2 string : pairs.collect()) {

System.out.println("Key " + string._1 + " " + "value " + string._2);

}

}

}

In Scala

In Scala, for the functions on keyed data to be available, we also need to return tuples. An implicit conversion on RDDs of tuples exists to provide the additional key/value functions.


object TestPair extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

val pairRdd = rdd.map { x =>

var str = x.split(",")

(str(0), x)

}

pairRdd.foreach(x =>
{

println(x._1 + " " + x._2)

})

println(pairRdd.count())

}

When creating a pair RDD from an in-memory collection in Scala and java, we only need to call SparkContext.parallelize() on a collection of pairs in scala and SparkContext.parallelizePairs() in java.

Transformations on Pair RDD

Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements. Pair RDDs are also still RDDs of Tuple2 objects in Java and Scala and thus support the same functions as RDDs so lets see an example of filtering data.

Lets take the same dataset as below


NAAACA,/shelf=0/slot=0/port=27,PTP,100000,100,6401884577,0,676,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,101500,200,6096776528,0,663,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,103000,300,5775316908,0,670,,,,
NAAACA,/shelf=0/slot=0/port=27,PTP,104500,400,6594627955,0,679,,,,

In Java

First we will convert the normal rdd into pair rdd by using the mapToPair function which takes PairFunction type of object which will return the tuple2 object for each row. Then we will pass this rdd into the filter method to filter the data as required.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class FilterKeyValue {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\system_data");

JavaPairRDD<String, String> pairRdd = rdd.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String x) throws Exception {
// TODO Auto-generated method
return new Tuple2<String, String>(x.split(",")[0], x);
}

});

JavaPairRDD<String, String> filteredRdd=pairRdd.filter(x -> {

if (Integer.parseInt(x._2.split(",")[4]) > 0) {
return true;
} else {
return false;
}

});

for (Tuple2 string : filteredRdd.collect()) {

System.out.println("Key "+string._1+" Value "+string._2);

}

}

}

In Scala

In Scala the conversion to pair rdd is handled automatically using implicit conversions. These implicits turn an RDD into various wrapper classes, such as DoubleRDDFunctions and PairRDDFunctions , to expose additional functions specific to each rdd type. Also in scala we can use the default map method. As we can see from the example above in java conversion between the specialized types of RDDs is a bit more explicit.This has the benefit of giving you a greater understanding of what exactly is going on, but can be a bit more cumbersome.


object KeyValueFilter extends App {

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\system_data")

val tupleRdd = rdd.map { x =>

new Tuple2(x.split(",")(0), x)

}

val test = tupleRdd.filter(f => {

if (f._2.split(",")(4).toInt > 0) {
true
} else {
false
}

})

test.foreach(f => {

println(f._2)

})

}

Sometimes working with pairs can be awkward if we want to access only the value part of our pair RDD. Since this is a common pattern, Spark provides the mapValues(func) function.

In Java


JavaPairRDD<String, String> d=pairRdd.mapValues(new Function<String, String>() {

@Override
public String call(String arg0) throws Exception {
// TODO Auto-generated method stub
return arg0;
}
});

In Scala


val t=tupleRdd.mapValues{ x =>

x

}