While Spark’s HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allows you to tune how an RDD is partitioned by providing a custom Partitioner object. This can help you further reduce communication by taking advantage of domain-specific knowledge.
To implement a custom partitioner, you need to subclass the org.apache.spark.Partitioner class and implement three methods
1. numPartitions: Int, which returns the number of partitions you will create.
2. getPartition(key: Any): Int, which returns the partition ID (0 to numPartitions-1) for a given key.
3. equals(), the standard Java equality method. This is important to implement because Spark will need to test your Partitioner object against other instances of itself when it decides whether two of your RDDs are partitioned the same way
Lets say our key is movie object but we want to partition the data on just movie name.
In Java
public class CustomMoviePartitioner extends org.apache.spark.Partitioner { private int numPartitions; public int getNumPartitions() { return numPartitions; } public void setNumPartitions(int numPartitions) { this.numPartitions = numPartitions; } public CustomMoviePartitioner(int numPartitions) { super(); this.numPartitions = numPartitions; } @Override public int getPartition(Object arg0) { Movie movie = (Movie) arg0; return Math.abs(movie.getName().hashCode() % getNumPartitions()); } @Override public int numPartitions() { // TODO Auto-generated method stub return getNumPartitions(); } @Override public boolean equals(Object obj) { if (obj instanceof CustomMoviePartitioner) { CustomMoviePartitioner partitionerObject = (CustomMoviePartitioner) obj; if (partitionerObject.getNumPartitions() == this.getNumPartitions()) return true; } return false; } }
In Scala
import org.apache.spark.Partitioner class MovieCustomPartitioner(numberOfPartitioner: Int) extends Partitioner { override def numPartitions: Int = numberOfPartitioner override def getPartition(key: Any): Int = { Math.abs(key.asInstanceOf[Movie].name.hashCode()% numPartitions) } // Java equals method to let Spark compare our Partitioner objects override def equals(other: Any): Boolean = other match { case partitioner: MovieCustomPartitioner => partitioner.numPartitions == numPartitions case _ => false } }
Below is an example on using the custom partition we created above
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class Test { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\input data\\movies_data_2"); rdd.mapToPair(new PairFunction<String, Movie, String>() { @Override public Tuple2<Movie, String> call(String arg0) throws Exception { String[] data = arg0.split(","); return new Tuple2<Movie, String>(new Movie(data[0], Double.parseDouble(data[1]), data[2]), arg0); } }).reduceByKey(new CustomMoviePartitioner(50), new Function2<String, String, String>() { @Override public String call(String arg0, String arg1) throws Exception { // TODO Auto-generated method stub return null; } }); } }