Spark has support for zipping rdds using functions like zip, zipPartition, zipWithIndex and zipWithUniqueId . Lets go through each of these functions with examples to understand there functionality.
Zip function
Zips one RDD with another one, returning key-value pairs. The first element contains the data from first rdd and the second element from the second rdd.
Lets take an example
Below is the dataset1
DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,
Below is dataset2
2DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 2EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 2ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 2EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,
There is a condition when using zip function that the two RDDs should have the same number of partitions and the same number of elements in each partition so something like one rdd was made through a map on the other rdd.
Below is the spark code in java
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class PartionFunctions { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5); JavaRDD<String> rdd2 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset2", 5); JavaPairRDD<String, String> d = rdd1.zip(rdd2); for (Tuple2<String, String> string : d.collect()) { System.out.println(string._1 + " " + string._2); } } }
Output of the above code
DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 2DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 2EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 2ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 2EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,
zipPartitions
The zipPartitions function is similar to zip function but its more flexible and also gives you more control as we have to pass a function to operate on the two iterators. The zipPartitions function combines multiple rdds into a new RDD according to the partition and it requires the combined RDD to have the same number of partitions, but not the number of elements within each partition with was a constraint in the zip function.
import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction2; public class PartionFunctions { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5); JavaRDD<String> rdd2 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset2", 2); JavaRDD<String> zipPartition=rdd1.zipPartitions(rdd2,new FlatMapFunction2<Iterator<String>, Iterator<String>, String>() { @Override public Iterator<String> call(Iterator<String> arg0, Iterator<String> arg1) throws Exception { String finals=null; List<String> list =new ArrayList<>(); while(arg0.hasNext() && arg1.hasNext()) { list.add(arg0.next()+" "+arg1.next()); } return list.iterator(); } }); for (String string :zipPartition.collect()) { System.out.println(string); } } }
zipWithIndex and zipWithUniqueId
zipWithIndex function zips the RDD with its element indices and zipWithUniqueId zips the RDD with generated unique Long ids.
Below is an example
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class PartionFunctions { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Test").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5); for (Tuple2<String, Long> string : rdd1.zipWithIndex().collect()) { System.out.println(string._1 + " " + string._2); } for (Tuple2<String, Long> string : rdd1.zipWithUniqueId().collect()) { System.out.println(string._1 + " " + string._2); } } }
input data used
DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,
output will be as below
DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 0 EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 1 ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2 ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 3 EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 4 DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 0 EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 1 ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2 ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 7 EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 4
Where is the output for zipPartitions?
I have missed adding the output for that you can run the code to check the same