We often have duplicates in the data and removing the duplicates from dataset is a common use case.If we want only unique elements we can use the RDD.distinct() transformation to produce a new RDD with only distinct items. Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element.
Input Data sample
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00, edwards,tim p,lieutenant,fire,f,salary,,114846.00, elkins,eric j,sergeant,police,f,salary,,104628.00, estrada,luis f,police officer,police,f,salary,,96060.00, ewing,marie a,clerk iii,police,f,salary,,53076.00, finn,sean p,firefighter,fire,f,salary,,87006.00, fitch,jordan m,law clerk,law,f,hourly,35,,14.51
distinct code for rdd and pair rdd
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class Distinct { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\inputdata\\small_sample"); JavaRDD<String> distinct_rdd = rdd.distinct(); for (String string : distinct_rdd.collect()) { System.out.println(string); } JavaPairRDD<String, String> pair = rdd.mapToPair(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String arg0) throws Exception { String data[] = arg0.split(","); return new Tuple2<String, String>(data[0], data[3]); } }); JavaPairRDD<String, String> distinct_record = pair.distinct(); for (Tuple2<String, String> string : distinct_record.collect()) { System.out.println(string._1 + " " + string._2); } } }
distinct using dataframe
import static org.apache.spark.sql.functions.col; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class DataSetApi { public static void main(String[] args) { SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json") .master("local").getOrCreate(); Dataset<Row> dataframe = session.read().option("inferSchema", "true") .csv("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("fname", "lname", "designation", "department", "jobtype", "NA", "NA2", "salary", "NA3"); dataframe.distinct().show(); } }