spark finding standard deviation and mean using rdd, dataframe and dataset

A standard deviation shows how much variation exists in the data from the average.

Problem

Given a list of employee salary and the department ,determine the standard deviation and mean of salary of each department.

Here is a sample input data attached employee_info.csv

Input Data sample

First Name,Last Name,Job Titles,Department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate


dubert,tomasz ,paramedic i/c,fire,f,salary,,91080.00,
edwards,tim p,lieutenant,fire,f,salary,,114846.00,
elkins,eric j,sergeant,police,f,salary,,104628.00,
estrada,luis f,police officer,police,f,salary,,96060.00,
ewing,marie a,clerk iii,police,f,salary,,53076.00,
finn,sean p,firefighter,fire,f,salary,,87006.00,
fitch,jordan m,law clerk,law,f,hourly,35,,14.51

Spark code for finding standard deviation and mean on pairRDD


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.StatCounter;
import scala.Tuple2;
import scala.Tuple3;

public class StandardDeviation {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("pattern").setMaster("local");

JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> rdd = jsc.textFile("C:\\codebase\\scala-project\\inputdata\\employee");

JavaPairRDD<String, Double> pair = rdd.mapToPair(new PairFunction<String, String, Double>() {

@Override
public Tuple2<String, Double> call(String value) throws Exception {
// TODO Auto-generated method stub

String data = value.toString();
String[] field = data.split(",", -1);
double salary = 0;

if (null != field && field.length == 9 && field[7].length() > 0) {

return new Tuple2<String, Double>(field[3], Double.parseDouble(field[7]));

}

return new Tuple2<String, Double>("Invalid", 0.0);
}
});

JavaPairRDD<String, StatCounter> output = pair.aggregateByKey(new StatCounter(), StatCounter::merge,
StatCounter::merge);

JavaRDD<Tuple3<String, Double, Double>> statistics = output
.map(new Function<Tuple2<String, StatCounter>, Tuple3<String, Double, Double>>() {

@Override
public Tuple3<String, Double, Double> call(Tuple2<String, StatCounter> stats) throws Exception {
// TODO Auto-generated method stub
return new Tuple3<String, Double, Double>(stats._1(), stats._2().stdev(), stats._2().mean());
}
});

for (Tuple3<String, Double, Double> string : statistics.collect()) {

System.out.println(string._1() + " " + string._2() + " " + string._3());

}

}

}

Spark code for finding standard deviation and mean using dataframe and dateset


import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.mean;
import static org.apache.spark.sql.functions.stddev_pop;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SD {

public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("Test").master("local").getOrCreate();

Dataset<Row> dataset = session.read().option("inferSchema", "true").csv("Input Path").toDF("fn", "ln",
"designation", "department", "emp_type", "full_hour", "NA", "salary", "NA2");

dataset.groupBy(col("department")).agg(stddev_pop("salary"),mean("salary")).show(100);

}

}

output

+--------------------+-------------------+------------------+
| department| stddev_pop(salary)| avg(salary)|
+--------------------+-------------------+------------------+
| HUMAN RELATIONS|1.827656706625529E7| 4664366.823529412|
| BUSINESS AFFAIRS| 17901.113404461048| 80446.425|
| CULTURAL AFFAIRS| 19475.91814362522| 87048.90909090909|
| CITY COUNCIL| 28000.689213568432| 63577.17206896553|
| LICENSE APPL COMM| 0.0| 80568.0|
| BOARD OF ELECTION| 25951.27149887194|56051.142857142855|
| BUDGET & MGMT| 24634.589482742234| 93925.3953488372|
| IPRA| 19972.289719324952| 94429.28571428571|
| GENERAL SERVICES| 22016.97480310627| 83095.5283902439|
| WATER MGMNT| 22585.087375362513| 89894.1118032787|
| BUILDINGS| 16818.445717336188| 98864.83353383462|
| BOARD OF ETHICS| 20455.039226312914| 94552.5|
| STREETS & SAN| 20351.953600368674| 84347.77570093458|
| AVIATION| 23050.82432015936| 76140.01877697841|
| POLICE| 16611.270671190465| 87836.02534889111|
| FAMILY & SUPPORT| 17429.109374489333| 79013.58878504673|
| ANIMAL CONTRL| 19315.766009450825| 66089.68421052632|
| TREASURER| 22907.562092231725| 88062.65217391304|
| ADMIN HEARNG| 21817.966626935515| 78912.94736842105|
| HEALTH| 21213.118163927607| 85488.2109375|
| LAW| 25666.418273342595| 84582.81440443214|
| DISABILITIES| 21083.999906607645| 82431.72413793103|
| POLICE BOARD| 19656.0| 86136.0|
| PROCUREMENT| 22200.974859699276| 83278.24390243902|
| CITY CLERK| 18981.993775936626| 69762.43902439025|
| HUMAN RESOURCES| 24294.372080970592| 79851.76119402985|
| DoIT| 16632.648172697624| 99681.02970297029|
| OEMC| 19392.66507531968| 73153.77822115383|
| PUBLIC LIBRARY| 19925.747703088775| 71273.28813559322|
| MAYOR'S OFFICE| 42767.8111824964| 96165.5123076923|
| INSPECTOR GEN| 23066.39814872611| 84030.66666666667|
| COPA| 34338.260581121816| 98784.70588235294|
| TRANSPORTN| 22692.692835886082| 89976.89606060603|
|COMMUNITY DEVELOP...| 18267.126036122707| 88363.25714285714|
| FIRE| 20235.504804337594| 97762.3486619425|
| FINANCE| 24245.274276582662| 73276.36466165414|
+--------------------+-------------------+------------------+