spark zip function – zip, zipPartition, zipWithIndex, zipWithUniqueId example

Spark has support for zipping rdds using functions like zip, zipPartition, zipWithIndex and zipWithUniqueId . Lets go through each of these functions with examples to understand there functionality.

Zip function

Zips one RDD with another one, returning key-value pairs. The first element contains the data from first rdd and the second element from the second rdd.

Lets take an example

Below is the dataset1


DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00,
EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00,
ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00,
ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00,
EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,

Below is dataset2


2DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00,
2EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00,
2ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00,
2ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00,
2EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,

There is a condition when using zip function that the two RDDs should have the same number of partitions and the same number of elements in each partition so something like one rdd was made through a map on the other rdd.

Below is the spark code in java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class PartionFunctions {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");

JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5);

JavaRDD<String> rdd2 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset2", 5);

JavaPairRDD<String, String> d = rdd1.zip(rdd2);

for (Tuple2<String, String> string : d.collect()) {

System.out.println(string._1 + " " + string._2);

}

}
}

Output of the above code


DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 2DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00,
EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 2EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00,
ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00,
ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 2ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00,
EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 2EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,

zipPartitions

The zipPartitions function is similar to zip function but its more flexible and also gives you more control as we have to pass a function to operate on the two iterators. The zipPartitions function combines multiple rdds into a new RDD according to the partition and it requires the combined RDD to have the same number of partitions, but not the number of elements within each partition with was a constraint in the zip function.


import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction2;

public class PartionFunctions {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");

JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5);

JavaRDD<String> rdd2 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset2", 2);

JavaRDD<String> zipPartition=rdd1.zipPartitions(rdd2,new FlatMapFunction2<Iterator<String>, Iterator<String>, String>() {

@Override
public Iterator<String> call(Iterator<String> arg0, Iterator<String> arg1) throws Exception {

String finals=null;
List<String> list =new ArrayList<>();
while(arg0.hasNext() && arg1.hasNext())
{
list.add(arg0.next()+" "+arg1.next());
}
return list.iterator();
}
});

for (String string :zipPartition.collect()) {
System.out.println(string);
}

}
}

zipWithIndex and zipWithUniqueId

zipWithIndex function zips the RDD with its element indices and zipWithUniqueId zips the RDD with generated unique Long ids.

Below is an example


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

public class PartionFunctions {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Test").setMaster("local");

JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> rdd1 = context.textFile("C:\\codebase\\scala-project\\inputdata\\employee\\dataset1", 5);

for (Tuple2<String, Long> string : rdd1.zipWithIndex().collect()) {

System.out.println(string._1 + " " + string._2);

}

for (Tuple2<String, Long> string : rdd1.zipWithUniqueId().collect()) {

System.out.println(string._1 + " " + string._2);

}
}
}

input data used


DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00,
EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00,
ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00,
ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00,
EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00,

output will be as below


DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 0
EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 1
ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2
ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 3
EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 4

DUBERT,TOMASZ ,PARAMEDIC I/C,FIRE,F,Salary,,91080.00, 0
EDWARDS,TIM P,LIEUTENANT,FIRE,F,Salary,,114846.00, 1
ELKINS,ERIC J,SERGEANT,POLICE,F,Salary,,104628.00, 2
ESTRADA,LUIS F,POLICE OFFICER,POLICE,F,Salary,,96060.00, 7
EWING,MARIE A,CLERK III,POLICE,F,Salary,,53076.00, 4

2 thoughts on “spark zip function – zip, zipPartition, zipWithIndex, zipWithUniqueId example”

    1. I have missed adding the output for that you can run the code to check the same

Comments are closed.