Problem :
1. Given a list of employees with there department and salary find the maximum and minimum salary in each department.
2. Given a list of employees with there department find the count of employees in each department.
Here is a sample input data attached employee_info.csv
Input Data sample
First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate
dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00, edwards,tim p,lieutenant,fire,f,salary,,114846.00, elkins,eric j,sergeant,police,f,salary,,104628.00, estrada,luis f,police officer,police,f,salary,,96060.00, ewing,marie a,clerk iii,police,f,salary,,53076.00, finn,sean p,firefighter,fire,f,salary,,87006.00, fitch,jordan m,law clerk,law,f,hourly,35,,14.51
Below is the code
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class Numeric { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("pattern").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<String> rdd = jsc.textFile("Input Path"); JavaPairRDD<String, Stats> pair = rdd.mapToPair(new PairFunction<String, String, Stats>() { @Override public Tuple2<String, Stats> call(String value) throws Exception { String data = value.toString(); String[] field = data.split(",", -1); double salary = 0; if (null != field && field.length == 9 && field[7].length() > 0) { Stats stats = new Stats(); stats.setCount(1); stats.setMaximum(Double.parseDouble(field[7])); stats.setMinimum(Double.parseDouble(field[7])); return new Tuple2<String, Stats>(field[3], stats); } return new Tuple2<String, Stats>("Invalid_Record", new Stats()); } }); JavaPairRDD<String, Stats> statsAgg = pair.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override public Stats call(Stats result, Stats value) throws Exception { // TODO Auto-generated method stub if (value.getMaximum() > result.getMaximum()) { result.setMaximum(value.getMaximum()); } if (value.getMinimum() < result.getMinimum()) { result.setMinimum(value.getMinimum()); } result.setCount(result.getCount() + 1); return result; } }); for (Tuple2<String, Stats> agg : statsAgg.collect()) { System.out.println( agg._1 + " " + agg._2.getMinimum() + " " + agg._2.getMaximum() + " " + agg._2.getCount()); } } }
Below is stats class used above
import java.io.Serializable;import java.io.Serializable; public class Stats implements Serializable { private int count; private double minimum; private double maximum; public int getCount() { return count; } public void setCount(int count) { this.count = count; } public double getMinimum() { return minimum; } public void setMinimum(double minimum) { this.minimum = minimum; } public double getMaximum() { return maximum; } public void setMaximum(double maximum) { this.maximum = maximum; } }
Below is the output
POLICE BOARD 66480.0 105792.0 2 HUMAN RESOURCES 37728.0 151572.0 67 ADMIN HEARNG 40392.0 156420.0 38 FIRE 36204.0 202728.0 4798 BOARD OF ELECTION 27912.0 133740.0 112 PUBLIC LIBRARY 29064.0 167004.0 708 CITY CLERK 38376.0 125292.0 82 IPRA 50124.0 122316.0 56 AVIATION 45696.0 300000.0 556 BOARD OF ETHICS 73944.0 135672.0 8 COMMUNITY DEVELOPMENT 47532.0 175020.0 210 POLICE 38376.0 260004.0 12941 LICENSE APPL COMM 80568.0 80568.0 1 PROCUREMENT 52176.0 167220.0 82 TRANSPORTN 36840.0 169500.0 396 FAMILY & SUPPORT 36204.0 175002.0 321 ANIMAL CONTRL 39528.0 130008.0 57 BUSINESS AFFAIRS 50676.0 157092.0 160 STREETS & SAN 38376.0 157092.0 321 FINANCE 31872.0 165000.0 532 GENERAL SERVICES 31872.0 157092.0 205 CITY COUNCIL 10008.0 160248.0 348 DISABILITIES 38100.0 138420.0 29 TREASURER 46356.0 137700.0 23 LAW 41640.0 173664.0 361 DoIT 58248.0 154992.0 101 BUILDINGS 38376.0 157092.0 266 Invalid_Record 0.0 0.0 7882 HUMAN RELATIONS 80568.0 7.777062E7 17 BUDGET & MGMT 60564.0 169992.0 43 HEALTH 40392.0 177000.0 512 COPA 53340.0 161856.0 17 WATER MGMNT 43632.0 169512.0 366 OEMC 37764.0 167796.0 832 CULTURAL AFFAIRS 43740.0 155040.0 66 INSPECTOR GEN 43308.0 161856.0 63 MAYOR'S OFFICE 0.96 216210.0 78
The same can be result can achieved using the dataset api as below
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.RelationalGroupedDataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.*; public class Numeric { public static void main(String[] args) { SparkSession session = SparkSession.builder().appName("Test").master("local").getOrCreate(); Dataset<Row> dataset = session.read().option("inferSchema", "true").csv("Input Path").toDF("fn", "ln", "designation", "department", "emp_type", "full_hour", "NA", "salary", "NA2"); RelationalGroupedDataset grouped=dataset.groupBy(col("department")); grouped.agg(max("salary"),min("salary"),count("department")).show(1000); } }