In this short article, I will show how to find the second-highest average in spark SQL. Below is the sample dataset, which we will be using to calculate the same.
+--------+-------------+--------+-----+--------+ |Hospital|AccountNumber| date|Visit| Amount| +--------+-------------+--------+-----+--------+ | Apollo| 1|20200901| 1| 234.0| | Apollo| 2|20200901| 0| 343.0| | Apollo| 3|20200901| 1| 434.0| | Apollo| 4|20200901| 0|565656.0| | Apollo| 1|20190901| 1| 234.0| | Apollo| 2|20190901| 0| 343.0| | Apollo| 3|20190901| 1| 434.0| | Apollo| 4|20190901| 0|565656.0| | Apollo| 1|20200902| 1| 4343.0| | Apollo| 2|20200902| 0| 3434.0| | Apollo| 3|20200902| 1| 3434.0| | Apollo| 4|20200902| 1| 3434.0| | Apollo| 1|20200903| 0| 6534.0| | Apollo| 2|20200903| 0| 3423.0| | Apollo| 3|20200903| 0| 32323.0| | Apollo| 4|20200903| 1| 454.0| | Apollo| 1|20200904| 0| 32323.0| | Apollo| 2|20200904| 0| 232.0| | Apollo| 3|20200904| 1| 3232.0| | Apollo| 4|20200904| 1| 6767.0| | JMC| 1|20200903| 0| 898.0| | JMC| 2|20200903| 0| 7878.0| | JMC| 3|20200903| 0| 4545.0| | JMC| 4|20200903| 1| 3434.0| | JMC| 1|20200904| 0| 6767.0| | JMC| 2|20200904| 0| 3434.0| | JMC| 3|20200904| 1| 34343.0| | JMC| 4|20200904| 1| 3434.0| | JMC| 1|20190904| 0| 6767.0| | JMC| 2|20190904| 0| 3434.0| | JMC| 3|20190904| 1| 34343.0| | JMC| 4|20190904| 1| 3434.0| | Fortis| 1|20200903| 0| 2121.0| | Fortis| 2|20200903| 0| 12323.0| | Fortis| 3|20200903| 0| 2323.0| | Fortis| 4|20200903| 1| 2323.0| | Fortis| 1|20200904| 0| 2323.0| | Fortis| 2|20200904| 0| 2323.0| | Fortis| 3|20200904| 1| 2323.0| | Fortis| 4|20200904| 1| 323.0| +--------+-------------+--------+-----+--------+
Let`s say we want to find the hospital which collected second highest amount every year . Below is the result we want to arrive at
+----+--------+-------------+----+ |year|Hospital|averageAmount|rank| +----+--------+-------------+----+ |2019| JMC| 11994.5| 2| |2020| JMC| 8091.625| 2| +----+--------+-------------+----+
Let`s create a dummy dataframe with the data as above.
package com.timepasstechies.blog.examples
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
class DataLoader {
def loadData(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
var sequenceOfOverview =
ListBuffer[(String, String, String, Integer, Double)]()
sequenceOfOverview += Tuple5("Apollo", "1", "20200901", 1, 234)
sequenceOfOverview += Tuple5("Apollo", "2", "20200901", 0, 343)
sequenceOfOverview += Tuple5("Apollo", "3", "20200901", 1, 434)
sequenceOfOverview += Tuple5("Apollo", "4", "20200901", 0, 5656)
sequenceOfOverview += Tuple5("Apollo", "1", "20190901", 1, 234)
sequenceOfOverview += Tuple5("Apollo", "2", "20190901", 0, 343)
sequenceOfOverview += Tuple5("Apollo", "3", "20190901", 1, 434)
sequenceOfOverview += Tuple5("Apollo", "4", "20190901", 0, 5656)
sequenceOfOverview += Tuple5("Apollo", "1", "20200902", 1, 4343)
sequenceOfOverview += Tuple5("Apollo", "2", "20200902", 0, 3434)
sequenceOfOverview += Tuple5("Apollo", "3", "20200902", 1, 3434)
sequenceOfOverview += Tuple5("Apollo", "4", "20200902", 1, 3434)
sequenceOfOverview += Tuple5("Apollo", "1", "20200903", 0, 6534)
sequenceOfOverview += Tuple5("Apollo", "2", "20200903", 0, 3423)
sequenceOfOverview += Tuple5("Apollo", "3", "20200903", 0, 3232)
sequenceOfOverview += Tuple5("Apollo", "4", "20200903", 1, 454)
sequenceOfOverview += Tuple5("Apollo", "1", "20200904", 0, 3232)
sequenceOfOverview += Tuple5("Apollo", "2", "20200904", 0, 232)
sequenceOfOverview += Tuple5("Apollo", "3", "20200904", 1, 3232)
sequenceOfOverview += Tuple5("Apollo", "4", "20200904", 1, 6767)
sequenceOfOverview += Tuple5("JMC", "1", "20200903", 0, 898)
sequenceOfOverview += Tuple5("JMC", "2", "20200903", 0, 7878)
sequenceOfOverview += Tuple5("JMC", "3", "20200903", 0, 4545)
sequenceOfOverview += Tuple5("JMC", "4", "20200903", 1, 3434)
sequenceOfOverview += Tuple5("JMC", "1", "20200904", 0, 6767)
sequenceOfOverview += Tuple5("JMC", "2", "20200904", 0, 3434)
sequenceOfOverview += Tuple5("JMC", "3", "20200904", 1, 34343)
sequenceOfOverview += Tuple5("JMC", "4", "20200904", 1, 3434)
sequenceOfOverview += Tuple5("JMC", "1", "20190904", 0, 6767)
sequenceOfOverview += Tuple5("JMC", "2", "20190904", 0, 3434)
sequenceOfOverview += Tuple5("JMC", "3", "20190904", 1, 34343)
sequenceOfOverview += Tuple5("JMC", "4", "20190904", 1, 3434)
sequenceOfOverview += Tuple5("Fortis", "1", "20200903", 0, 2121)
sequenceOfOverview += Tuple5("Fortis", "2", "20200903", 0, 1232)
sequenceOfOverview += Tuple5("Fortis", "3", "20200903", 0, 2323)
sequenceOfOverview += Tuple5("Fortis", "4", "20200903", 1, 2323)
sequenceOfOverview += Tuple5("Fortis", "1", "20200904", 0, 2323)
sequenceOfOverview += Tuple5("Fortis", "2", "20200904", 0, 2323)
sequenceOfOverview += Tuple5("Fortis", "3", "20200904", 1, 2323)
sequenceOfOverview += Tuple5("Fortis", "4", "20200904", 1, 323)
val df1 =
sequenceOfOverview.toDF(
"Hospital",
"AccountNumber",
"date",
"Visit",
"Amount"
)
df1
}
}
Lets code the solution to find second highest average in spark SQL
package com.timepasstechies.blog.examples
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object MaxOfAverage extends App {
val dataLoader = new DataLoader()
lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val data = dataLoader.loadData(sparkSession)
val yearlyData = data
.select(
col("Hospital"),
col("AccountNumber"),
col("Visit"),
col("Amount"),
year(to_date(col("date").cast("String"), "yyyyMMdd")).as("year")
)
.groupBy("year", "Hospital")
.agg(avg(col("Amount")).as("averageAmount"))
val win =
Window
.partitionBy(col("year"))
.orderBy(col("averageAmount").desc)
yearlyData
.select(col("*"), rank().over(win).as("rank"))
.filter(col("rank") === 2)
.show()
}
That’s a brief on how we can find the max of average in spark SQL.