spark finding minimum,maximum and count using rdd, dataframe and dataset

Problem :

1. Given a list of employees with there department and salary find the maximum and minimum salary in each department.
2. Given a list of employees with there department find the count of employees in each department.

Here is a sample input data attached employee_info.csv
Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate

dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Below is the code


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class Numeric {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("pattern").setMaster("local");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> rdd = jsc.textFile("Input Path");

JavaPairRDD<String, Stats> pair = rdd.mapToPair(new PairFunction<String, String, Stats>() {

@Override
public Tuple2<String, Stats> call(String value) throws Exception {

String data = value.toString();
String[] field = data.split(",", -1);
double salary = 0;

if (null != field && field.length == 9 && field[7].length() > 0) {

Stats stats = new Stats();
stats.setCount(1);
stats.setMaximum(Double.parseDouble(field[7]));
stats.setMinimum(Double.parseDouble(field[7]));

return new Tuple2<String, Stats>(field[3], stats);

}

return new Tuple2<String, Stats>("Invalid_Record", new Stats());

}

});

JavaPairRDD<String, Stats> statsAgg = pair.reduceByKey(new Function2<Stats, Stats, Stats>() {

@Override
public Stats call(Stats result, Stats value) throws Exception {
// TODO Auto-generated method stub

if (value.getMaximum() > result.getMaximum()) {
result.setMaximum(value.getMaximum());
}

if (value.getMinimum() < result.getMinimum()) {
result.setMinimum(value.getMinimum());
}

result.setCount(result.getCount() + 1);

return result;
}
});

for (Tuple2<String, Stats> agg : statsAgg.collect()) {

System.out.println(
agg._1 + " " + agg._2.getMinimum() + " " + agg._2.getMaximum() + " " + agg._2.getCount());

}
}
}

Below is stats class used above


import java.io.Serializable;import java.io.Serializable;
public class Stats implements Serializable {
private int count;
private double minimum;
private double maximum;
public int getCount() { return count; }
public void setCount(int count) { this.count = count; }
public double getMinimum() { return minimum; }
public void setMinimum(double minimum) { this.minimum = minimum; }
public double getMaximum() { return maximum; }
public void setMaximum(double maximum) { this.maximum = maximum; }
}

Below is the output

POLICE BOARD 66480.0 105792.0 2
HUMAN RESOURCES 37728.0 151572.0 67
ADMIN HEARNG 40392.0 156420.0 38
FIRE 36204.0 202728.0 4798
BOARD OF ELECTION 27912.0 133740.0 112
PUBLIC LIBRARY 29064.0 167004.0 708
CITY CLERK 38376.0 125292.0 82
IPRA 50124.0 122316.0 56
AVIATION 45696.0 300000.0 556
BOARD OF ETHICS 73944.0 135672.0 8
COMMUNITY DEVELOPMENT 47532.0 175020.0 210
POLICE 38376.0 260004.0 12941
LICENSE APPL COMM 80568.0 80568.0 1
PROCUREMENT 52176.0 167220.0 82
TRANSPORTN 36840.0 169500.0 396
FAMILY & SUPPORT 36204.0 175002.0 321
ANIMAL CONTRL 39528.0 130008.0 57
BUSINESS AFFAIRS 50676.0 157092.0 160
STREETS & SAN 38376.0 157092.0 321
FINANCE 31872.0 165000.0 532
GENERAL SERVICES 31872.0 157092.0 205
CITY COUNCIL 10008.0 160248.0 348
DISABILITIES 38100.0 138420.0 29
TREASURER 46356.0 137700.0 23
LAW 41640.0 173664.0 361
DoIT 58248.0 154992.0 101
BUILDINGS 38376.0 157092.0 266
Invalid_Record 0.0 0.0 7882
HUMAN RELATIONS 80568.0 7.777062E7 17
BUDGET & MGMT 60564.0 169992.0 43
HEALTH 40392.0 177000.0 512
COPA 53340.0 161856.0 17
WATER MGMNT 43632.0 169512.0 366
OEMC 37764.0 167796.0 832
CULTURAL AFFAIRS 43740.0 155040.0 66
INSPECTOR GEN 43308.0 161856.0 63
MAYOR'S OFFICE 0.96 216210.0 78

 

The same can be result can achieved using the dataset api as below


import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.*;

public class Numeric {

public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("Test").master("local").getOrCreate();

Dataset<Row> dataset = session.read().option("inferSchema", "true").csv("Input Path").toDF("fn", "ln",
"designation", "department", "emp_type", "full_hour", "NA", "salary", "NA2");

RelationalGroupedDataset grouped=dataset.groupBy(col("department"));

grouped.agg(max("salary"),min("salary"),count("department")).show(1000);

}

}