1. Overview
In this short article I will show how to pass an array as udf parameter in spark sql.
2. The Problem
Let`s say we want to find the patient who visited maximum time to a hospital . Below is the input dataset
+--------+-------------+--------+-----+ |Hospital|AccountNumber| date|Visit| +--------+-------------+--------+-----+ | Apollo| 1|20200901| 1| | Apollo| 2|20200901| 0| | Apollo| 3|20200901| 1| | Apollo| 4|20200901| 0| | Apollo| 1|20200902| 1| | Apollo| 2|20200902| 0| | Apollo| 3|20200902| 1| | Apollo| 4|20200902| 1| | Apollo| 1|20200903| 0| | Apollo| 2|20200903| 0| | Apollo| 3|20200903| 0| | Apollo| 4|20200903| 1| | Apollo| 1|20200904| 0| | Apollo| 2|20200904| 0| | Apollo| 3|20200904| 1| | Apollo| 4|20200904| 1| +--------+-------------+--------+-----+
3. Solution
Let`s code the solution
3.1 Create a dummy dataframe
Let`s create a dummy dataframe with the data as above.
package com.timepasstechies.blog import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer class SparkExcel { def getSampleDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String, String, Integer)]() sequenceOfOverview += Tuple4("Apollo", "1", "20200901", 1) sequenceOfOverview += Tuple4("Apollo", "2", "20200901", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200901", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200901", 0) sequenceOfOverview += Tuple4("Apollo", "1", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "2", "20200902", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "1", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "2", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "4", "20200903", 1) sequenceOfOverview += Tuple4("Apollo", "1", "20200904", 0) sequenceOfOverview += Tuple4("Apollo", "2", "20200904", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200904", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200904", 1) val df1 = sequenceOfOverview.toDF("Hospital", "AccountNumber", "date", "Visit") df1 } }
3.2 Passing array into udf
In the below example we are passing Visits array column into the getConsecutiveVisit spark udf and calculating the total count.
import com.timepasstechies.blog.SparkExcel import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object ArrayInUDF extends App { def getConsecutiveVisit = (visitList: Seq[Any]) => { var totalVisits = 0 visitList.foreach(x => { totalVisits += 1 }) totalVisits } val dataLoader = new DataLoader() lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val excel = new SparkExcel() val df = excel.getSampleDataFrame(sparkSession) df.show() val aggregatedVisitsDf = df .groupBy("Hospital", "AccountNumber") .agg(collect_list("Visit").as("Visits")) val visitCountUdf = sparkSession.udf.register("getConsecutiveVisit", getConsecutiveVisit) aggregatedVisitsDf .withColumn("maxVisits", visitCountUdf(col("Visits"))) .sort(col("maxVisits").desc) .limit(1) .show() }
That’s a brief on how we can pass array into a spark udf.
Thanks for the post. The key part for me was to accept Seq[Any] type in UDF, but not Array[_].