spark distinct example for rdd,pairrdd and dataframe

We often have duplicates in the data and removing the duplicates from dataset is a common use case.If we want only unique elements we can use the RDD.distinct() transformation to produce a new RDD with only distinct items. Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element.

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

distinct code for rdd and pair rdd


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class Distinct {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\inputdata\\small_sample");

JavaRDD<String> distinct_rdd = rdd.distinct();

for (String string : distinct_rdd.collect()) {

System.out.println(string);

}

JavaPairRDD<String, String> pair = rdd.mapToPair(new PairFunction<String, String, String>() {

@Override
public Tuple2<String, String> call(String arg0) throws Exception {

String data[] = arg0.split(",");

return new Tuple2<String, String>(data[0], data[3]);

}
});

JavaPairRDD<String, String> distinct_record = pair.distinct();

for (Tuple2<String, String> string : distinct_record.collect()) {

System.out.println(string._1 + " " + string._2);

}

}

}

distinct using dataframe


import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DataSetApi {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Dataset<Row> dataframe = session.read().option("inferSchema", "true")
.csv("C:\\codebase\\scala-project\\inputdata\\small_sample")
.toDF("fname", "lname", "designation", "department", "jobtype", "NA", "NA2", "salary", "NA3");

dataframe.distinct().show();

}

}

Leave a Reply

Your email address will not be published. Required fields are marked *