spark sql consecutive sequence example

In this short article, I will show how to find the consecutive sequence in spark SQL. Below is the sample dataset, which we will be using to calculate the same.

+——–+————-+——–+—–+
|Hospital|AccountNumber| date|Visit|
+——–+————-+——–+—–+
| Apollo| 1|20200901| 1|
| Apollo| 2|20200901| 0|
| Apollo| 3|20200901| 1|
| Apollo| 4|20200901| 0|
| Apollo| 1|20200902| 1|
| Apollo| 2|20200902| 0|
| Apollo| 3|20200902| 1|
| Apollo| 4|20200902| 1|
| Apollo| 1|20200903| 0|
| Apollo| 2|20200903| 0|
| Apollo| 3|20200903| 0|
| Apollo| 4|20200903| 1|
| Apollo| 1|20200904| 0|
| Apollo| 2|20200904| 0|
| Apollo| 3|20200904| 1|
| Apollo| 4|20200904| 1|
+——–+————-+——–+—–+
Let`s say we want to find the AccountNumber who visited the Hospital on a most consecutive days .Below is the result we want to arrive at

+——–+————-+———+————+———+
|Hospital|AccountNumber|max(rank)| consecutive|maxVisits|
+——–+————-+———+————+———+
| Apollo| 4| 4|[0, 1, 1, 1]| 3|
+——–+————-+———+————+———+
Let`s create a dummy dataframe with the data as above.

import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer

class SparkExcel {

def getSampleDataFrame(sparkSession: SparkSession): 
DataFrame = {

import sparkSession.implicits._

var sequenceOfOverview = ListBuffer[(String, String, String,
 Integer)]()

sequenceOfOverview += Tuple4("Apollo", "1", "20200901", 1)
sequenceOfOverview += Tuple4("Apollo", "2", "20200901", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200901", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200901", 0)

sequenceOfOverview += Tuple4("Apollo", "1", "20200902", 1)
sequenceOfOverview += Tuple4("Apollo", "2", "20200902", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200902", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200902", 1)

sequenceOfOverview += Tuple4("Apollo", "1", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "2", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200903", 0)
sequenceOfOverview += Tuple4("Apollo", "4", "20200903", 1)

sequenceOfOverview += Tuple4("Apollo", "1", "20200904", 0)
sequenceOfOverview += Tuple4("Apollo", "2", "20200904", 0)
sequenceOfOverview += Tuple4("Apollo", "3", "20200904", 1)
sequenceOfOverview += Tuple4("Apollo", "4", "20200904", 1)

val df1 =
sequenceOfOverview.toDF("Hospital", "AccountNumber", 
"date", "Visit")
df1
}

}

Lets code the solution to find consecutive sequence example in spark SQL

import com.timepasstechies.blog.SparkExcel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object MostConsecutiveTest extends App {

def getConsecutiveVisit = (visitList: Seq[Any]) => {

var count = 0
var totalConsecutiveVisits = 0

visitList.foreach(x => {

if (x == 0) {
count = 0

} else {
count += 1
totalConsecutiveVisits = Math.max(totalConsecutiveVisits, count)
}

})

totalConsecutiveVisits
}

val dataLoader = new DataLoader()
lazy val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.getOrCreate()

val excel = new SparkExcel()

val df = excel.getSampleDataFrame(sparkSession)

val windowSpecToCollectAllVisits = Window
.partitionBy("Hospital", "AccountNumber")
.orderBy(col("date"))

val visitListDf = df
.withColumn(
"cons",
collect_list("Visit").over(windowSpecToCollectAllVisits)
)
.withColumn("rank", rank().over(windowSpecToCollectAllVisits))

val aggregatedVisitsDf = visitListDf
.groupBy("Hospital", "AccountNumber")
.agg(max("rank"), last("cons").as("consecutive"))

aggregatedVisitsDf.printSchema()
aggregatedVisitsDf.show()

val visitCountUdf =
sparkSession.udf.register("getConsecutiveVisit", getConsecutiveVisit)

aggregatedVisitsDf
.withColumn("maxVisits", visitCountUdf(col("consecutive")))
.sort(col("maxVisits").desc)
.limit(1)
.show()

}

That’s a brief on how we can find the most consecutive sequence in spark SQL.