A window function calculates a return value for every input row of a table based on a group of rows, called the Frame. Every input row can have a unique frame associated with it. This characteristic of window functions makes them more powerful than other functions and allows users to express various data processing tasks that are hard to be expressed without window functions in a concise way.
There is two kinds of functions supported by Spark SQL that could be used to calculate a single return value. Built-in functions or user defined functions, such as substr or round, take values from a single row as input, and they generate a single return value for every input row. Aggregate functions, such as SUM or MAX, operate on a group of rows and calculate a single return value for every group.
While these are both very useful in practice, there is still a wide range of operations that cannot be expressed using these types of functions alone. Specifically, there was no way to both operate on a group of rows while still returning a single value for every input row. This limitation makes it hard to conduct various data processing tasks like calculating a moving average, calculating a cumulative sum, or accessing the values of a row appearing before the current row. Fortunately for users of Spark SQL, window functions fill this gap.
Window Functions
Spark SQL supports three kinds of window functions ranking functions, analytic functions, and aggregate functions.
Below is the available ranking and analytic functions
[table id=1 /]
For aggregate functions, users can use any existing aggregate function as a window function like sum,mean,max,min etc.
For more information please check this link https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Lets take an example
Below is the input dataset used
BRUNO,POLICE,500.00 ALLISON,FIRE,900.00 BLLISON,WATER,700.00 MICHAEL,POLICE,600.00 DREG,POLICE,300.00 POMMY,WATER,100.00 SUMO,POLICE,200.00 CLOVIN,FIRE,900.00 RAMBO,FIRE,300.00 SUMAN,FIRE,400.00 COLIN,WATER,800.00
Using Ranking Functions
Rank and DenseRank Function Example
The first step is to create a WindowSpec with Partitioning,Ordering and Frame Specification.
DenseRank returns the rank of rows within the partition of a result set, without any gaps in the ranking . For example see the rank column below dataset. ALLISON and CLOVIN gets the same rank and suman gets rank 2 without any gaps in ranking.
+-------+------+------+----+ | name| dep| sal|rank| +-------+------+------+----+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 2| | POMMY| WATER|100.00| 3| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 2| | DREG|POLICE|300.00| 3| | SUMO|POLICE|200.00| 4| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 1| | SUMAN| FIRE|400.00| 2| | RAMBO| FIRE|300.00| 3| +-------+------+------+----+
Output if rank is used . As we can see there is gap in ranking as suman gets rank 3.
+-------+------+------+----+ | name| dep| sal|rank| +-------+------+------+----+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 2| | POMMY| WATER|100.00| 3| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 2| | DREG|POLICE|300.00| 3| | SUMO|POLICE|200.00| 4| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 1| | SUMAN| FIRE|400.00| 3| | RAMBO| FIRE|300.00| 4| +-------+------+------+----+
Lets solve the problem of finding the highest and the second highest salaried person in every department
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.*; import static org.apache.spark.sql.functions.col; import org.apache.spark.sql.Column; import org.apache.spark.sql.expressions.Window; import org.apache.spark.sql.expressions.WindowSpec; public class WindowDataFrame { public static void main(String[] args) { SparkSession session= SparkSession.builder().appName("window"). master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session. read().csv("C:\\codebase\\scala-project\\inputdata\\small_sample"). toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")) .orderBy(col("sal").desc()); Column column_rank=rank().over(window); emp.select(col("name"),col("dep"),col("sal"), column_rank.as("rank")).where(col("rank").leq(2)).show(); Column column_dense_rank=dense_rank().over(window); emp.select(col("name"),col("dep"),col("sal"), column_dense_rank.as("rank")).where(col("rank").leq(2)).show(); } }
Output For Rank
+-------+------+------+----+ | name| dep| sal|rank| +-------+------+------+----+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 2| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 2| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 1| +-------+------+------+----+
Output For DenseRank
+-------+------+------+----+ | name| dep| sal|rank| +-------+------+------+----+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 2| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 2| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 1| | SUMAN| FIRE|400.00| 2| +-------+------+------+----+
Using ROWNUMBER Function
This Function returns the sequential number of a row within a partition of a result set, without any gaps in the ranking.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession. builder().appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session.read(). csv("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")). orderBy(col("sal").desc()); Column column_row=row_number().over(window); emp.select(col("name"),col("dep"), col("sal"),column_row.as("row_num")).show(); } }
output
+-------+------+------+-------+ | name| dep| sal|row_num| +-------+------+------+-------+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 2| | POMMY| WATER|100.00| 3| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 2| | DREG|POLICE|300.00| 3| | SUMO|POLICE|200.00| 4| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 2| | SUMAN| FIRE|400.00| 3| | RAMBO| FIRE|300.00| 4| +-------+------+------+-------+
Using percent_rank and ntile function
percent_rank – Calculates the relative rank of a row within a group of rows.
ntile – The ntile function takes an integer as an input and divides the records of the result set into that number of groups.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession.builder() .appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session.read(). csv("C:\\codebase\\scala-project\\inputdata\\small_sample"). toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")). orderBy(col("sal").desc()); Column percentRank=percent_rank().over(window); emp.select(col("name"),col("dep"),col("sal"), percentRank.as("percentRank")).show(); Column ntile=ntile(2).over(window); emp.select(col("name"),col("dep"), col("sal"),ntile.as("ntile")).show(); } }
Output for percentRank
+-------+------+------+------------------+ | name| dep| sal| percentRank| +-------+------+------+------------------+ | COLIN| WATER|800.00| 0.0| |BLLISON| WATER|700.00| 0.5| | POMMY| WATER|100.00| 1.0| |MICHAEL|POLICE|600.00| 0.0| | ?BRUNO|POLICE|500.00|0.3333333333333333| | DREG|POLICE|300.00|0.6666666666666666| | SUMO|POLICE|200.00| 1.0| |ALLISON| FIRE|900.00| 0.0| | CLOVIN| FIRE|900.00| 0.0| | SUMAN| FIRE|400.00|0.6666666666666666| | RAMBO| FIRE|300.00| 1.0| +-------+------+------+------------------+
OutPut for ntile
+-------+------+------+-----+ | name| dep| sal|ntile| +-------+------+------+-----+ | COLIN| WATER|800.00| 1| |BLLISON| WATER|700.00| 1| | POMMY| WATER|100.00| 2| |MICHAEL|POLICE|600.00| 1| | ?BRUNO|POLICE|500.00| 1| | DREG|POLICE|300.00| 2| | SUMO|POLICE|200.00| 2| |ALLISON| FIRE|900.00| 1| | CLOVIN| FIRE|900.00| 1| | SUMAN| FIRE|400.00| 2| | RAMBO| FIRE|300.00| 2| +-------+------+------+-----+
Using Analytic functions
cume_dist – Calculates the cumulative distribution of a value in a group of values . It computes the relative position of a specified value in a group of values.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")). orderBy(col("sal").desc()); Column cume_dist=cume_dist().over(window); emp.select(col("name"),col("dep"),col("sal"), cume_dist.as("cume_dist")).show(); } }
Output
+-------+------+------+------------------+ | name| dep| sal| cume_dist| +-------+------+------+------------------+ | COLIN| WATER|800.00|0.3333333333333333| |BLLISON| WATER|700.00|0.6666666666666666| | POMMY| WATER|100.00| 1.0| |MICHAEL|POLICE|600.00| 0.25| | ?BRUNO|POLICE|500.00| 0.5| | DREG|POLICE|300.00| 0.75| | SUMO|POLICE|200.00| 1.0| |ALLISON| FIRE|900.00| 0.5| | CLOVIN| FIRE|900.00| 0.5| | SUMAN| FIRE|400.00| 0.75| | RAMBO| FIRE|300.00| 1.0| +-------+------+------+------------------+
firstValue
Returns First value within each partition.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")). orderBy(col("sal").desc()); Column first=first(col("sal")).over(window); emp.select(col("name"),col("dep"),col("sal"), first.as("first")).show(); } }
Output
+-------+------+------+------+ | name| dep| sal| first| +-------+------+------+------+ | COLIN| WATER|800.00|800.00| |BLLISON| WATER|700.00|800.00| | POMMY| WATER|100.00|800.00| |MICHAEL|POLICE|600.00|600.00| | ?BRUNO|POLICE|500.00|600.00| | DREG|POLICE|300.00|600.00| | SUMO|POLICE|200.00|600.00| |ALLISON| FIRE|900.00|900.00| | CLOVIN| FIRE|900.00|900.00| | SUMAN| FIRE|400.00|900.00| | RAMBO| FIRE|300.00|900.00| +-------+------+------+------+
last_value
Returns last value within each partition. We need to use the frame here as the default window frame is range between unbounded preceding and current row, so the last_value never looks beyond current row unless you change the frame.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext(). setLogLevel("ERROR"); Dataset<Row> emp=session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name","dep","sal"); /* 0 represents Window.currentRow as preceding value is 0 so this will not consider any rows before current row and Long.MAX_VALUE represents Window.unboundedFollowing so all the rows after the current row will be considired. If we are using spark version 2.1 and above we can use these constants directly. */ WindowSpec window=Window.partitionBy(col("dep")). orderBy(col("sal").desc()).rowsBetween(0,Long.MAX_VALUE); Column last=last(col("sal")).over(window); emp.select(col("name"),col("dep"),col("sal") ,last.as("last")).show(); } }
Output
+-------+------+------+------+ | name| dep| sal| last| +-------+------+------+------+ | COLIN| WATER|800.00|100.00| |BLLISON| WATER|700.00|100.00| | POMMY| WATER|100.00|100.00| |MICHAEL|POLICE|600.00|200.00| | ?BRUNO|POLICE|500.00|200.00| | DREG|POLICE|300.00|200.00| | SUMO|POLICE|200.00|200.00| |ALLISON| FIRE|900.00|300.00| | CLOVIN| FIRE|900.00|300.00| | SUMAN| FIRE|400.00|300.00| | RAMBO| FIRE|300.00|300.00| +-------+------+------+------+
Lead and Lag
Lead function allows us to compare current row with subsequent rows within each partition depending on the second argument (offset) which is by default set to 1 that is next row but you can change that parameter to 2 to compare against every other row.The 3rd parameter is default value to be returned when no subsequent values exists or null.
Lag function allows us to compare current row with preceding rows within each partition depending on the second argument (offset) which is by default set to 1 that is previous row but you can change that parameter to 2 to compare against every other preceding row.The 3rd parameter is default value to be returned when no preceding values exists or null.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session=SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp=session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name","dep","sal"); WindowSpec window=Window.partitionBy(col("dep")) .orderBy(col("sal").desc()); Column lag=lag(col("sal"),1,0).over(window); Column lead=lead(col("sal"),1,0).over(window); emp.select(col("name"),col("dep"),col("sal") ,lag.as("lag"),lead.as("lead")).show(); } }
Output
+-------+------+------+------+------+ | name| dep| sal| lag| lead| +-------+------+------+------+------+ | COLIN| WATER|800.00| 0|700.00| |BLLISON| WATER|700.00|800.00|100.00| | POMMY| WATER|100.00|700.00| 0| |MICHAEL|POLICE|600.00| 0|500.00| | ?BRUNO|POLICE|500.00|600.00|300.00| | DREG|POLICE|300.00|500.00|200.00| | SUMO|POLICE|200.00|300.00| 0| |ALLISON| FIRE|900.00| 0|900.00| | CLOVIN| FIRE|900.00|900.00|400.00| | SUMAN| FIRE|400.00|900.00|300.00| | RAMBO| FIRE|300.00|400.00| 0| +-------+------+------+------+------+
Lets solve a problem on lag . How to subtract column salary value in successive rows. So in our input dataset we need to subtract 700-800,100-700 ….
public class WindowDataFrame { public static void main(String[] args) { SparkSession session = SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp = session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name", "dep", "sal"); WindowSpec window = Window.partitionBy(col("dep")) .orderBy(col("sal").asc()); Column lag = lag(col("sal"), 1, 0).over(window); emp.select(col("name"), col("dep"), col("sal"), col("sal").minus(lag).as("lag")).show(); } }
Output
+-------+------+------+------- | name| dep| sal|lag| +-------+------+------+------- | POMMY| WATER|100.00| 100.0| |BLLISON| WATER|700.00| 600.0| | COLIN| WATER|800.00| 100.0| | SUMO|POLICE|200.00| 200.0| | DREG|POLICE|300.00| 100.0| | ?BRUNO|POLICE|500.00| 200.0| |MICHAEL|POLICE|600.00| 100.0| | RAMBO| FIRE|300.00| 300.0| | SUMAN| FIRE|400.00| 100.0| |ALLISON| FIRE|900.00| 500.0| | CLOVIN| FIRE|900.00| 0.0| +-------+------+------+--------------------------------------------------------------------------------+
Window Function on aggregate functions
lets calculate the running sum of salary using the sum aggregate function with window.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session = SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp = session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample") .toDF("name", "dep", "sal"); WindowSpec window = Window.partitionBy(col("dep")) .orderBy(col("sal").asc()); Column agg = sum("sal").over(window); emp.select(col("name"), col("dep"), col("sal"), agg.as("sum")).show(); } }
Output
+-------+------+------+ | name| dep| sal|sum +-------+------+------+ | POMMY| WATER|100.00| 100.0| |BLLISON| WATER|700.00| 800.0| | COLIN| WATER|800.00| 1600.0| | SUMO|POLICE|200.00| 200.0| | DREG|POLICE|300.00| 500.0| | ?BRUNO|POLICE|500.00| 1000.0| |MICHAEL|POLICE|600.00| 1600.0| | RAMBO| FIRE|300.00| 300.0| | SUMAN| FIRE|400.00| 700.0| |ALLISON| FIRE|900.00| 2500.0| | CLOVIN| FIRE|900.00| 2500.0| +-------+------+------+------------------------------------------------------------------+
The same result can be obtained by using row frame . Note that the start boundary or the preceding value should be always less than the end boundary or the following value.
WindowSpec window = Window.partitionBy(col("dep")) .orderBy(col("sal"). asc()).rowsBetween(-Integer.MAX_VALUE,0); Column agg = sum("sal").over(window); emp.select(col("name"), col("dep"), col("sal"), agg).show();
Which gives same result as below
+-------+------+------+----- | name| dep| sal|sum(sal) +-------+------+------+-----+ | POMMY| WATER|100.00| 100.0| |BLLISON| WATER|700.00| 800.0| | COLIN| WATER|800.00| 1600.0| | SUMO|POLICE|200.00| 200.0| | DREG|POLICE|300.00| 500.0| | ?BRUNO|POLICE|500.00| 1000.0| |MICHAEL|POLICE|600.00| 1600.0| | RAMBO| FIRE|300.00| 300.0| | SUMAN| FIRE|400.00| 700.0| |ALLISON| FIRE|900.00| 1600.0| | CLOVIN| FIRE|900.00| 2500.0| +-------+------+------+-------
If we use below code it will consider the ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING. So the output will be current row salary+ preceding row salary+ following row salary.
WindowSpec window = Window.partitionBy(col("dep")) .orderBy(col("sal").asc()).rowsBetween(-1,1);
Output
+-------+------+------+-- | name| dep| sal|sum(sal) +-------+------+------+----- | POMMY| WATER|100.00| 800.0| |BLLISON| WATER|700.00| 1600.0| | COLIN| WATER|800.00| 1500.0| | SUMO|POLICE|200.00| 500.0| | DREG|POLICE|300.00| 1000.0| | ?BRUNO|POLICE|500.00| 1400.0| |MICHAEL|POLICE|600.00| 1100.0| | RAMBO| FIRE|300.00| 700.0| | SUMAN| FIRE|400.00| 1600.0| |ALLISON| FIRE|900.00| 2200.0| | CLOVIN| FIRE|900.00| 1800.0| +-------+------+------+-------
lets calculate the running sum of salary using the sum aggregate function with window in reverse order.
We will use the rowsBetween function which is a ROW frame which is based on physical offsets from the position of the current input row. The first parameter is the start boundary and the second parameter is the end boundary. In the below function 0 is the currentRow and Long.MAX_VALUE is the end boundary. So when the window function runs it starts from the current row and sums all the salary of all the data inside the grouped department partition.
public class WindowDataFrame { public static void main(String[] args) { SparkSession session = SparkSession. builder().appName("window").master("local"). getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp = session.read().csv ("C:\\codebase\\scala-project\\inputdata\\small_sample"). toDF("name", "dep", "sal"); WindowSpec window = Window.partitionBy(col("dep")). orderBy(col("sal").asc()).rowsBetween(0, Long.MAX_VALUE); Column agg = sum("sal").over(window); emp.select(col("name"), col("dep"), col("sal"), agg).show(); } }
Output
+-------+------+------+-- | name| dep| sal|sum(sal) +-------+------+------+---- |BLLISON| WATER|700.00| 1500.0| | COLIN| WATER|800.00| 800.0| | SUMO|POLICE|200.00| 1600.0| | DREG|POLICE|300.00| 1400.0| | ?BRUNO|POLICE|500.00| 1100.0| |MICHAEL|POLICE|600.00| 600.0| | RAMBO| FIRE|300.00| 2500.0| | SUMAN| FIRE|400.00| 2200.0| |ALLISON| FIRE|900.00| 1800.0| | CLOVIN| FIRE|900.00| 900.0| +-------+------+------+-------
Lets use max,min,avg and mean aggregate functions on window
public class WindowDataFrame { public static void main(String[] args) { SparkSession session = SparkSession.builder(). appName("window").master("local").getOrCreate(); session.sparkContext().setLogLevel("ERROR"); Dataset<Row> emp = session.read(). csv("C:\\codebase\\scala-project\\inputdata\\small_sample").toDF("name", "dep", "sal"); WindowSpec window = Window. partitionBy(col("dep")).orderBy(col("sal"). asc()).rowsBetween(0,Integer.MAX_VALUE); Column max = max("sal").over(window); Column min = min("sal").over(window); Column avg = avg("sal").over(window); Column mean = mean("sal").over(window); emp.select(col("name"), col("dep"), col("sal"), max.as("max"),min.as("min"),avg.as("avg"), mean.as("mean")).show(); } }
Output
+-------+------+------+------+------+- | name| dep| sal| max| min| avg| mean| +-------+------+------+------+------+---------------- | POMMY| WATER|100.00|800.00|100.00|533.3334|533.3333| |BLLISON| WATER|700.00|800.00|700.00| 750.0| 750.0| | COLIN| WATER|800.00|800.00|800.00| 800.0| 800.0| | SUMO|POLICE|200.00|600.00|200.00| 400.0| 400.0| | DREG|POLICE|300.00|600.00|300.00|466.66667|466.666| | ?BRUNO|POLICE|500.00|600.00|500.00| 550.0| 550.0| |MICHAEL|POLICE|600.00|600.00|600.00| 600.0| 600.0| | RAMBO| FIRE|300.00|900.00|300.00| 625.0| 625.0| | SUMAN| FIRE|400.00|900.00|400.00|733.33334|733.33| |ALLISON| FIRE|900.00|900.00|900.00| 900.0| 900.0| | CLOVIN| FIRE|900.00|900.00|900.00| 900.0| 900.0| +-------+------+------+------+------+---------------