spark top n records example in a sample data using rdd and dataframe

Finding outliers is an important part of data analysis because these records are typically the most interesting and unique pieces of data in the set. The point of this pattern is to find the best records for a specific criterion so that you can take a look at them and perhaps figure out what caused them to be so special.

Uses of this pattern

1. Catchy dashboards to display top selling products or something of interest.
2. Select interesting data
3. Anomaly analysis – Odd data analyses

Problem to Solve :

1. Given a list of employees with there information find the top 10 highest paid employees.

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

code for RDD


import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class TopNRecords {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("pattern").setMaster("local");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> rdd = jsc.textFile("Input Path");

JavaRDD<String> sortedData = rdd.sortBy(new Function<String, Double>() {

@Override
public Double call(String arg0) throws Exception {

String[] data = arg0.split(",");

return data[7].length() > 0 ? Double.parseDouble(data[7]) : 0.0;
}
}, false, 50);

Iterator<String> top = sortedData.take(10).iterator();

while (top.hasNext()) {
System.out.println(top.next());
}

}

}

code using dataframe


import static org.apache.spark.sql.functions.col;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DataSetApi {

public static void main(String[] args) {

SparkSession session = SparkSession.builder().appName("Test").config("spark.sql.sources.default", "json")
.master("local").getOrCreate();

Dataset<Row> dataframe = session.read().option("inferSchema", "true")
.csv("Input Path")
.toDF("fname", "lname", "designation", "department", "jobtype", "NA", "NA2", "salary", "NA3");

dataframe.sort(col("salary").desc()).limit(10).show();

}

}