In this short article, I will show how to find the consecutive sequence in spark SQL. Below is the sample dataset, which we will be using to calculate the same.
+——–+————-+——–+—–+
|Hospital|AccountNumber| date|Visit|
+——–+————-+——–+—–+
| Apollo| 1|20200901| 1|
| Apollo| 2|20200901| 0|
| Apollo| 3|20200901| 1|
| Apollo| 4|20200901| 0|
| Apollo| 1|20200902| 1|
| Apollo| 2|20200902| 0|
| Apollo| 3|20200902| 1|
| Apollo| 4|20200902| 1|
| Apollo| 1|20200903| 0|
| Apollo| 2|20200903| 0|
| Apollo| 3|20200903| 0|
| Apollo| 4|20200903| 1|
| Apollo| 1|20200904| 0|
| Apollo| 2|20200904| 0|
| Apollo| 3|20200904| 1|
| Apollo| 4|20200904| 1|
+——–+————-+——–+—–+
Let`s say we want to find the AccountNumber who visited the Hospital on a most consecutive days .Below is the result we want to arrive at
+——–+————-+———+————+———+
|Hospital|AccountNumber|max(rank)| consecutive|maxVisits|
+——–+————-+———+————+———+
| Apollo| 4| 4|[0, 1, 1, 1]| 3|
+——–+————-+———+————+———+
Let`s create a dummy dataframe with the data as above.
import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ListBuffer class SparkExcel { def getSampleDataFrame(sparkSession: SparkSession): DataFrame = { import sparkSession.implicits._ var sequenceOfOverview = ListBuffer[(String, String, String, Integer)]() sequenceOfOverview += Tuple4("Apollo", "1", "20200901", 1) sequenceOfOverview += Tuple4("Apollo", "2", "20200901", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200901", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200901", 0) sequenceOfOverview += Tuple4("Apollo", "1", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "2", "20200902", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200902", 1) sequenceOfOverview += Tuple4("Apollo", "1", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "2", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200903", 0) sequenceOfOverview += Tuple4("Apollo", "4", "20200903", 1) sequenceOfOverview += Tuple4("Apollo", "1", "20200904", 0) sequenceOfOverview += Tuple4("Apollo", "2", "20200904", 0) sequenceOfOverview += Tuple4("Apollo", "3", "20200904", 1) sequenceOfOverview += Tuple4("Apollo", "4", "20200904", 1) val df1 = sequenceOfOverview.toDF("Hospital", "AccountNumber", "date", "Visit") df1 } }
Lets code the solution to find consecutive sequence example in spark SQL
import com.timepasstechies.blog.SparkExcel import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ object MostConsecutiveTest extends App { def getConsecutiveVisit = (visitList: Seq[Any]) => { var count = 0 var totalConsecutiveVisits = 0 visitList.foreach(x => { if (x == 0) { count = 0 } else { count += 1 totalConsecutiveVisits = Math.max(totalConsecutiveVisits, count) } }) totalConsecutiveVisits } val dataLoader = new DataLoader() lazy val sparkSession: SparkSession = SparkSession .builder() .master("local[*]") .getOrCreate() val excel = new SparkExcel() val df = excel.getSampleDataFrame(sparkSession) val windowSpecToCollectAllVisits = Window .partitionBy("Hospital", "AccountNumber") .orderBy(col("date")) val visitListDf = df .withColumn( "cons", collect_list("Visit").over(windowSpecToCollectAllVisits) ) .withColumn("rank", rank().over(windowSpecToCollectAllVisits)) val aggregatedVisitsDf = visitListDf .groupBy("Hospital", "AccountNumber") .agg(max("rank"), last("cons").as("consecutive")) aggregatedVisitsDf.printSchema() aggregatedVisitsDf.show() val visitCountUdf = sparkSession.udf.register("getConsecutiveVisit", getConsecutiveVisit) aggregatedVisitsDf .withColumn("maxVisits", visitCountUdf(col("consecutive"))) .sort(col("maxVisits").desc) .limit(1) .show() }
That’s a brief on how we can find the most consecutive sequence in spark SQL.