spark custom partitioner example in java and scala – tutorial 9

While Spark’s HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allows you to tune how an RDD is partitioned by providing a custom Partitioner object. This can help you further reduce communication by taking advantage of domain-specific knowledge.

To implement a custom partitioner, you need to subclass the org.apache.spark.Partitioner class and implement three methods

1. numPartitions: Int, which returns the number of partitions you will create.

2. getPartition(key: Any): Int, which returns the partition ID (0 to numPartitions-1) for a given key.

3. equals(), the standard Java equality method. This is important to implement because Spark will need to test your Partitioner object against other instances of itself when it decides whether two of your RDDs are partitioned the same way

Lets say our key is movie object but we want to partition the data on just movie name.

In Java


public class CustomMoviePartitioner extends org.apache.spark.Partitioner {

private int numPartitions;

public int getNumPartitions() {
return numPartitions;
}

public void setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}

public CustomMoviePartitioner(int numPartitions) {
super();
this.numPartitions = numPartitions;
}

@Override
public int getPartition(Object arg0) {

Movie movie = (Movie) arg0;

return Math.abs(movie.getName().hashCode() % getNumPartitions());
}

@Override
public int numPartitions() {
// TODO Auto-generated method stub
return getNumPartitions();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof CustomMoviePartitioner) {
CustomMoviePartitioner partitionerObject = (CustomMoviePartitioner) obj;
if (partitionerObject.getNumPartitions() == this.getNumPartitions())
return true;
}

return false;
}

}

In Scala


import org.apache.spark.Partitioner

class MovieCustomPartitioner(numberOfPartitioner: Int) extends Partitioner {

override def numPartitions: Int = numberOfPartitioner

override def getPartition(key: Any): Int = {

Math.abs(key.asInstanceOf[Movie].name.hashCode()% numPartitions)

}

// Java equals method to let Spark compare our Partitioner objects

override def equals(other: Any): Boolean = other match {
case partitioner: MovieCustomPartitioner =>
partitioner.numPartitions == numPartitions
case _ =>
false
}

}

Below is an example on using the custom partition we created above


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class Test {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2");

rdd.mapToPair(new PairFunction<String, Movie, String>() {

@Override
public Tuple2<Movie, String> call(String arg0) throws Exception {

String[] data = arg0.split(",");

return new Tuple2<Movie, String>(new Movie(data[0], Double.parseDouble(data[1]), data[2]), arg0);
}
}).reduceByKey(new CustomMoviePartitioner(50), new Function2<String, String, String>() {

@Override
public String call(String arg0, String arg1) throws Exception {
// TODO Auto-generated method stub
return null;
}
});

}

}