top of page

Part 2: Deep Dive - Spark Window Functions

  • Writer: Rishaab
    Rishaab
  • Nov 19, 2024
  • 5 min read

Updated: Nov 30, 2024

This is part 2 of deep dive series on understanding internals of Apache Spark Window Functions.


Following our discussion of Spark's rank() window function in Part 1, we will now explore the detailed implementation of Apache Spark's Window function at the code level.



InternalRows

Before we dive into the internals of Window functions, we should briefly understand the concept of an InternalRow.


An InternalRow is a representation that Spark internally uses during the execution to achieve efficient processing of rows. There are many different variants of InternalRows in spark and one such implementation is UnsafeRow.


An UnsafeRow is an efficient and compact byte representation of rows that do not use JVM memory to store object. The UnsafeRow string representation for our test dataset rows from Part 1 will look like so,

Dataset Row

UnsafeRow

(1, "a1")

[0,1,1800000002,3161]

(2, "b1")

[0,2,1800000002,3162]

(1, "a2")

[0,1,1800000002,3261]

(2, "b2")

[0,2,1800000002,3262]

The UnsafeRow has 3 component - [null_bit_set, fixed_length_region, variable_length_region]


A bit in the first component - null_bit_set is set to 1, if the corresponding column value in the row is null, otherwise 0.

The fixed_length_region stores the value of fixed length data types like integer, etc, in this region directly.

The variable_length_region is used for variable length types like string. It stores a long value that has starting offset of the actual data and its length. The actual data is stored in that particular offset.


For the representation - [0,1,1800000002,3161], the first value is 0 because we do not have nulls. The second component is 1 which represents the integer value 1, stored directly. The third value is the offset and length of the string combined and the forth component 3161 is the UTF8String representation of "a1".


From now onwards, we will mostly be working with the InternalRow representation. Refer this table if you want to associate the InternalRow with the corresponding Dataset Row in later parts of this series.


Deeply understanding the UnsafeRow representation is not the scope of this series (it will be a topic for later) however, if you want to play around and understand by yourself, you can use the below code to convert any external Row representation to UnsafeRow representation.

val encoder = encoderFor[(Int, String)]
val toRow = encoder.createSerializer()
val internalRows = Seq(
	(1, "a1"),
	(2, "b1"),
	(1, "a2"),
	(2, "b2")).map { r => toRow(r).copy() }

Note: The above methods are internal to Spark, you should have Spark codebase up and running to test them out. Click here for code reference for encoder.




WindowExec - Physical Plan Operator

WindowExec is the physical operator that handles the processing of Window functions in Spark SQL. Just like any Spark's physical operator, this too indirectly gets inherited from SparkPlan. SparkPlan represents the abstract physical operator in Spark SQL engine and all physical operator must inherit from it. The class hierarchy looks like so,


When we invoke the spark action df.show() from our test example from Part 1, the Spark triggers the physical plan execution. Each physical operator in the physical plan invoke their doExecute() method. The doExecute() method is what generates an RDD of InternalRow, ie. RDD[InternalRow].


Essentially, the sole responsibility of the SQL physical plan execution is to generate the lineage of RDDs. This RDD lineage is what gets executed during the job execution. However, at this point, the Spark task execution has not begun. Understanding the task execution flow of Spark is a huge topic on its own, and this post presumes you have some familiarity on this subject. Here, we will focus solely on comprehending the execution flow of the Window function.




The doExecute()

As mentioned before, the WindowExec.doExecute() method generates the RDD for the Window function. This particular piece of code in Spark roughly looks like so,

child.execute().mapPartitions { stream => 
	// Window function closure logic here which returns
    // an Iterator.
}

The doExecute() method is invoked at the Spark driver and in case of Window function, all it does is return a new RDD of InternalRow which applies the window function closure to each partition (mapPartitions) of the child RDD.


A closure is simply a lambda function (in this case, window function logic) that is serializable and executed when an RDD is computed. Remember, doExecute() only returns the RDD and does not execute the closure.


The stream represents the Scala Iterator of InternalRow. This means that when the window function RDD is executed by the task, it will provide an iterator for the output rows produced by the child operator. The iterator is consumed by the Window function operator to perform the processing.


Just like the Window function is consuming the iterator provided by the child operator, it must also provide an iterator to be consumed by the parent operator. To do so, the window function closure will simply return an Iterator[InternalRow] on being executed.


The interesting stuff happens when the parent operator consumes (pull) rows from this iterator. It is at this very moment, the window function machinery is driven. As you might might have figured out by this time, the Apache Spark execution follows a pull based execution model, also called Volcano style model. The meaty stuff happens inside the hasNext() and next() call of the window function Iterator. We will come back to the iterator execution machinery as it requires more context to fully understand.




Processors and Frames

Before we dig into the execution of Window function RDD discussed above, we should grok two components - WindowFunctionFrame and AggregateProcessor . Understanding these two components is crucial as they form the backbone of the Window function processing in the Spark. A more in-depth explanation of these components will be provided in upcoming posts. For now, let's establish a foundational understanding.



WindowFunctionFrame is an abstract class that represents the notion of a window frame in Spark. Spark offers variety of frame types and our example code from Part 1 specified one such frame called UnboundedPrecedingWindowFunctionFrame using the below syntax,

val w = Window
	.partitionBy("col1")
	.orderBy("col2")
	.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In the subsequent part of the deep-dive series, we will focus on the implementation details of UnboundedPrecedingWindowFunctionFrame. Once we grasp how this frame is works, understanding the other frames working will be relatively similar.


Next, the AggregateProcessor is a class that facilitates in computation of an aggregate operation, like SUM, RANK, etc. A WindowFunctionFrame hands off the computation to AggregateProcessor for its evaluating window function expressions over a series of rows.



In the coming parts, we will understand these two entities deeply. Please go to the next part to continue the window function journey.



Recent Posts

See All

Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page