Joining data together is probably one of the most common operations on a pair RDD, and spark has full range of options including right and left outer joins, cross joins, and inner joins.
DataSet One
1,Super Man,Animation 2,Captain America,Adventure 3,The Hulk,Comedy 4,Iron Man,Comedy 5,Bat Man,Action 6,Spider Man,Action 7,Disaster,Action
DataSet Two
Spider Man,4,978302174 Spider Man,4,978301398 Spider Man,4,978302091 Bat Man,5,978298709 Bat Man,4,978299000
Lets do a inner join on the above dataset using movie name as the key
In Java
SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); JavaRDD<String> rdd2 = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data"); JavaPairRDD<String, Double> pairRdd = rdd.mapToPair(new PairFunction<String, String, Double>() { @Override public Tuple2<String, Double> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, Double>(data[0], Double.parseDouble(data[1])); } }); JavaPairRDD<String, String> pairRdd2 = rdd2.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String str) throws Exception { String[] data = str.split(",", -1); return new Tuple2<String, String>(data[1], data[2]); } }); JavaPairRDD<String, Tuple2<Double, String>> innerRdd=pairRdd.join(pairRdd2); for (Tuple2<String, Tuple2<Double, String>> tuple : innerRdd.collect()) { System.out.println(tuple._1+" "+tuple._2._1+" "+tuple._2._2); }
In Scala
val conf = new SparkConf().setAppName("scala spark").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2") val rdd2 = sc.textFile("C:\\codebase\\scala-project\\input data\\movies_data") val rddPair1 = rdd.map { x => var data = x.split(",") new Tuple2(data(0), data(1)) } val rddPair2 = rdd2.map { x => var data = x.split(",") new Tuple2(data(1), data(2)) } rddPair1.join(rddPair2).collect().foreach(f =>{ println(f._1+" "+f._2._1+" "+f._2._2) })
Left Outer Join
In Java
JavaPairRDD<String, Tuple2<Double, Optional<String>>> leftJoinRdd=pairRdd.leftOuterJoin(pairRdd2); for (Tuple2<String,Tuple2<Double, Optional<String>>> tuple: leftJoinRdd.collect()) { System.out.println(tuple._1+" "+tuple._2._1+" "+tuple._2._2.orNull()); }
In Scala
rddPair1.leftOuterJoin(rddPair2).collect().foreach(f =>{ println(f._1+" "+f._2._1+" "+f._2._2.getOrElse("NULL")) })
Right Outer Join
In Java
JavaPairRDD<String,Tuple2<Optional<Double>,String>> rightJoin=pairRdd.rightOuterJoin(pairRdd2); for (Tuple2<String,Tuple2<Optional<Double>,String>> tuple : rightJoin.collect()) { System.out.println(tuple._1+" "+tuple._2._1.orNull()+" "+tuple._2._2); }
In Scala
rddPair1.rightOuterJoin(rddPair2).collect().foreach(f =>{ println(f._1+" "+f._2._1.getOrElse("NULL")+" "+f._2._2) })