Pass array as UDF parameter in Spark SQL

1. Overview

In this short article I will show how to pass an array as udf parameter in spark sql.

2. The Problem

Let`s say we want to find the patient who visited maximum time to a hospital . Below is the input dataset

+--------+-------------+--------+-----+
|Hospital|AccountNumber| date|Visit|
+--------+-------------+--------+-----+
| Apollo| 1|20200901| 1|
| Apollo| 2|20200901| 0|
| Apollo| 3|20200901| 1|
| Apollo| 4|20200901| 0|
| Apollo| 1|20200902| 1|
| Apollo| 2|20200902| 0|
| Apollo| 3|20200902| 1|
| Apollo| 4|20200902| 1|
| Apollo| 1|20200903| 0|
| Apollo| 2|20200903| 0|
| Apollo| 3|20200903| 0|
| Apollo| 4|20200903| 1|
| Apollo| 1|20200904| 0|
| Apollo| 2|20200904| 0|
| Apollo| 3|20200904| 1|
| Apollo| 4|20200904| 1|
+--------+-------------+--------+-----+

3. Solution

Let`s code the solution

3.1 Create a dummy dataframe

Let`s create a dummy dataframe with the data as above.


package com.timepasstechies.blog

import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer

class SparkExcel {

def getSampleDataFrame(sparkSession: SparkSession): DataFrame = {

import sparkSession.implicits._
var sequenceOfOverview = ListBuffer[(String, String, String, Integer)]()
sequenceOfOverview += Tuple4("Apollo", "1", "20200901", 1)
sequenceOfOverview += Tuple4("Apollo", "2", "20200901", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200901", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200901", 0)

sequenceOfOverview += Tuple4("Apollo", "1", "20200902", 1)
sequenceOfOverview += Tuple4("Apollo", "2", "20200902", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200902", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200902", 1)

sequenceOfOverview += Tuple4("Apollo", "1", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "2", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "4", "20200903", 1)

sequenceOfOverview += Tuple4("Apollo", "1", "20200904", 0)
sequenceOfOverview += Tuple4("Apollo", "2", "20200904", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200904", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200904", 1)

val df1 =
sequenceOfOverview.toDF("Hospital", "AccountNumber", "date", "Visit")
df1
}

}

3.2 Passing array into udf 

In the below example we are passing Visits array column into the getConsecutiveVisit spark udf and calculating the total count.


import com.timepasstechies.blog.SparkExcel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ArrayInUDF extends App {

def getConsecutiveVisit = (visitList: Seq[Any]) => {

var totalVisits = 0

visitList.foreach(x => {

totalVisits += 1

})

totalVisits
}

val dataLoader = new DataLoader()
lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()

val excel = new SparkExcel()

val df = excel.getSampleDataFrame(sparkSession)

df.show()

val aggregatedVisitsDf = df
.groupBy("Hospital", "AccountNumber")
.agg(collect_list("Visit").as("Visits"))

val visitCountUdf =
sparkSession.udf.register("getConsecutiveVisit", getConsecutiveVisit)

aggregatedVisitsDf
.withColumn("maxVisits", visitCountUdf(col("Visits")))
.sort(col("maxVisits").desc)
.limit(1)
.show()

}

That’s a brief on how we can pass array into a spark udf.

Leave a Reply

Your email address will not be published. Required fields are marked *