spark inner join and outer joins example in java and scala – tutorial 6

Joining data together is probably one of the most common operations on a pair RDD, and spark has full range of options including right and left outer joins, cross joins, and inner joins.

DataSet One


1,Super Man,Animation
2,Captain America,Adventure
3,The Hulk,Comedy
4,Iron Man,Comedy
5,Bat Man,Action
6,Spider Man,Action
7,Disaster,Action

DataSet Two


Spider Man,4,978302174
Spider Man,4,978301398
Spider Man,4,978302091
Bat Man,5,978298709
Bat Man,4,978299000

Lets do a inner join on the above dataset using movie name as the key

In Java


SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");
JavaRDD<String> rdd2 = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data");

JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() {

@Override
public Tuple2<String, Double> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1]));
}

});

JavaPairRDD<String, String> pairRdd2 = rdd2.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String str) throws Exception {

String[] data = str.split(",", -1);

return new Tuple2<String, String>(data[1], data[2]);
}

});

JavaPairRDD<String, Tuple2<Double, String>> innerRdd=pairRdd.join(pairRdd2);

for (Tuple2<String, Tuple2<Double, String>> tuple : innerRdd.collect()) {

System.out.println(tuple._1+" "+tuple._2._1+" "+tuple._2._2);

}

In Scala


val conf = new SparkConf().setAppName("scala spark").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2")
val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data")

val rddPair1 = rdd.map { x =>

var data = x.split(",")

new Tuple2(data(0), data(1))

}

val rddPair2 = rdd2.map { x =>

var data = x.split(",")

new Tuple2(data(1), data(2))

}

rddPair1.join(rddPair2).collect().foreach(f =>{

println(f._1+" "+f._2._1+" "+f._2._2)

})

Left Outer Join

In Java


JavaPairRDD<String, Tuple2<Double, Optional<String>>> leftJoinRdd=pairRdd.leftOuterJoin(pairRdd2);

for (Tuple2<String,Tuple2<Double, Optional<String>>> tuple: leftJoinRdd.collect()) {

System.out.println(tuple._1+" "+tuple._2._1+" "+tuple._2._2.orNull());

}

In Scala


rddPair1.leftOuterJoin(rddPair2).collect().foreach(f =>{

println(f._1+" "+f._2._1+" "+f._2._2.getOrElse("NULL"))


})

Right Outer Join

In Java


JavaPairRDD<String,Tuple2<Optional<Double>,String>> rightJoin=pairRdd.rightOuterJoin(pairRdd2);

for (Tuple2<String,Tuple2<Optional<Double>,String>> tuple : rightJoin.collect()) {

System.out.println(tuple._1+" "+tuple._2._1.orNull()+" "+tuple._2._2);

}

In Scala


rddPair1.rightOuterJoin(rddPair2).collect().foreach(f =>{

println(f._1+" "+f._2._1.getOrElse("NULL")+" "+f._2._2)


})