top of page

Part 3: Deep Dive - Spark Window Functions

  • Writer: Rishaab
    Rishaab
  • Nov 22, 2024
  • 7 min read

Updated: Nov 29, 2024

In the previous post, we briefly alluded about AggregateProcessor without going into the depth. In this part, we will focus on its implementation details.


To set things up, an AggregateProcessor is a entity that facilitates in computation of an aggregate operation, like SUM, RANK, etc. In order to ensure high efficiency in a scalable system like Spark, this aggregation process sacrifices immutability, a fundamental concept in the Scala language. AggregateProcessor leverages mutable entities to achieve optimal performance, and three most prominent entities leveraged by AggregateProcessor are - SpecificInternalRow, MutableProjection and JoinedRow. We should spent some time understanding them first.




SpecificInternalRow

A SpecificInternalRow is a specialized row representation that internally uses MutableValue types.


A MutableValue is of specific type, like Int, Boolean, etc. and allows in-place updates of values. These in-place updates minimizes the object creation and hence reduce the impact of garbage collection. The SpecificInternalRow is a container for the array of MutableValue, where their types can be derived by the schema.


For e.g., SpecificInternalRow accepts a sequence of data types, and for each type it creates a corresponding MutableValue. Here is the code snippet for the same,

 def this(dataTypes: Seq[DataType]) = {
    this(new Array[MutableValue](dataTypes.length))
    var i = 0
    dataTypes.foreach { dt =>
      values(i) = dataTypeToMutableValue(dt)
      i += 1
    }
  }



MutableProjection

A MutableProjection is an abstract implementation and responsible for evaluating a series of expressions over the provided input row and write the result to the output row, called target. The target row can be specified by calling this target() method.


The mutable nature enables evaluating expressions frequently and writing the result to the same target row without the overhead of creating new row every time.


There are two variants of MutableProjection, InterpretedMutableProjection GeneratedMutableProjection


The former evaluates expressions using interpreted style while the latter performs code-generation to evaluates expressions.


Remember, from Part 1, we disabled the code generation by setting the below property, hence in our case the InterpretedMutableProjection will be called.

spark.conf.set("spark.sql.codegen.factoryMode", "NO_CODEGEN")

Moving forward, whenever we mention MutableProjection, we will implicitly refer to InterpretedMutableProjection.


We won't dedicate a significant amount of time on InterpretedMutableProjection, however, following points should be noted about it,




JoinedRow

A JoinedRow acts like two rows concatenated together (logically) and provides convenient way to lookup their values. A JoinedRow has left and right row. The statement join(buffer, input) creates a JoinedRow with left row being buffer and right row being input.


JoinedRow is used by AggregatedProcessor to evaluate certain expressions on two rows together and compute the aggregated result. We will learn more about this later.


In a JoinedRow, the indices are zero-based, starting with the left row index, followed by the right row.


To comprehend how the values are indexed in a join row, consider the following,

  • A left row of type SpecificInternalRow with 3 elements: [Int, Int, String] represented as [1, 1, "a"].

  • The right row of type UnsafeRow (refer to Part 2 for UnsafeRow) for a row (2, "a1") encoded as [0,2,1800000002,3161] . It contains 2 elements.


The JoinedRow for the above will be represented as,

joinedRow{ [0, 1, "a"], [0,2,1800000002,3161] }

The joinedRow[2] will retrieve the string value "a" from the left row, ie. 2nd index starting from left row.

The joinedRow[3] will retrieve the integer value 2 from the right row at index position 0, because the left row's size permits indexing only up to the 2nd index, so the 3rd index aligns with the 0th index in the right row.

Similarly, the joinedRow[4] will retrieve the string value 3161 (i.e., "a1") from the right row.




AggregateProcessor

AggregateProcessor works by evaluating a series of expressions over the input row to compute the aggregated result.


The aggregated result is stored in a variable called buffer which is of type SpecificInternalRow (discussed above).


We will now understand how this buffer is initialized and updated, and how the final aggregated result is fetched from the buffer.



  • Initializing buffer

The buffer initialization is done by the initialize(size: Int) method.


This method uses initialProjection of type MutableProjection to initialize its buffer.


For our example code from Part 1 to calculate rank(), the expressions used for initializing the buffer will look like so,

exprs: ArrayBuffer (size = 3)
├── Literal: 0
├── Literal: 1
└── Literal: null

The below code will evaluate the above expression and set the evaluated value to the buffer.

initialProjection(buffer)

For the rank() function, this expression will initialize the buffer to,

buffer[0] = 0
buffer[1] = 1
buffer[2] = null	


  • Updating buffer from input row

After, initializing the buffer, the AggregateProcessor is ready to process the provided inputRow. The update(inputRow: InternalRow) method is called for this purpose.


The updateProjection is responsible for evaluating a series of expression and update the buffer.


For our rank() example code, the following expressions will be evaluated against the inputRow.

exprs: ArrayBuffer (size = 3)
├── if
│   ├── Condition: 
│   │   ├── ((input[4, string, true] <=> input[2, string, true])
│   │   └── AND NOT (input[0, int, false] = 0))
│   ├── True branch: input[0, int, false]
│   └── False branch: input[1, int, false]
├── Add
│   └── (input[1, int, false] + 1)
└── BoundReference
    └── input[4, string, true]
Note: Above expressions need to be evaluated, with each expression updating the buffer value at its respective index. For instance, the first If expression updates buffer[0], the second Add expression updates buffer[1], and so forth.


The below code will execute the above expressions over the JoinedRow with left row being the buffer and right row being inputRow (of type UnsafeRow) for our example dataset in Part 1.

updateProjection(join(buffer, inputRow))

Let's run these expressions on the first dataset row, (1, "a1").


In this case, the buffer will be [0, 1, null], which is the initial value since we haven't begun evaluating any rank yet.

The inputRow row will be [0, 1, 1800000002, 3161], representing the UnsafeRow for the dataset row (1, "a1").

Consequently, the JoinedRow will appear as {[0, 1, null], [0, 1, 1800000002, 3161]}.



Let's understand the expressions evaluation,

├── if
│   ├── Condition: 
│   │   ├── ((input[4, string, true] <=> input[2, string, true])
│   │   └── AND NOT (input[0, int, false] = 0))
│   ├── True branch: input[0, int, false]
│   └── False branch: input[1, int, false]
  • The <=> operator is the null-safe equality comparison in Spark. It checks whether two inputs are equal, even if one or both values are null. If both values are null, it treats them as equal.

  • input[4, string, true]:

    • This retrieves the string value at the 4th index of the JoinedRow.

    • The true indicates that the string can have a null value.

    • In the context of a JoinedRow, this value is fetched from the right row which is the inputRow. Specifically, it corresponds to the col2 column of the example dataset, which has the value "a1" in this example.

  • input[2, string, true]:

    • This retrieves the string value at the 2nd index of the JoinedRow.

    • The true also allows this value to be null.

    • This value is fetched from the left row, which is the buffer. In this example, the value is null because no previous col2 value has been seen.

  • input[0, int, false] = 0:

    • This expression checks if this is the first time that the rank getting calculated.

    • input[0, int, false] retrieves the integer value at the 0th index of the JoinedRow. The false means this value cannot be null.

    • In our example, the condition evaluates to true because it's the first row getting ranked and hence, buffer[0] has value 0.

  • The condition as a whole checks that the values of input[4] and input[2] are equal (or both null), and that the rank being calculated is not the first rank (checked via NOT (input[0] = 0)).

    • If true, then there is a tie (input[4] = input[2]), and hence fetch the existing rank present from buffer at 0th index (input[0, int, false]). Meaning, this inputRow will be assigned the last rank. The value at buffer[0] will not be updated.

    • If false, then there is either no tie or this is the first rank being assigned, hence, fetch the next rank from the buffer at 1st index (input[1, int, false]). Meaning, the inputRow will be assigned the next rank, which is 1. The value of buffer[0] will be set to the value of buffer[1].


├── Add
│   └── (input[1, int, false] + 1)
  • It calculates the next rank by increasing buffer[1] by 1. Consequently, buffer[1] will become 2, signifying the potential rank to be allocated to the next inputRow.


BoundReference
    └── input[4, string, true]
  • The value of col2 from the right row is retrieved and assigned to buffer[2]. Hence, buffer[2] will be set to "a1". This value will then be compared with the next inputRow col2 value to determine the rank.



After these evaluations, the buffer is set to [1, 2, "a1"].

The value at 0th index is the rank to be assigned to the inputRow.

The value at 1st index is the potential next rank to be assigned to the next inputRow.

The value at 2nd position is the value of col2 to compare with the next inputRow.




  • Evaluating and returning result

Now that the buffer has been updated with the rank details, the AggregateProcessor must perform final expression evaluation and return the rank for that inputRow in the desired format.


For this, the caller must invoke evaluate(target: InternalRow) method and it's implemented like so,

def evaluate(target: InternalRow): Unit = {
	evaluateProjection.target(target)(buffer)
}

The target is the output row where the final evaluated result will be written.

The target is provided by the WindowFunctionFrame and set by invoking evaluateProjection.target(target).


Then, the evaluateProjection.apply(buffer) is implicitly called which evaluates the expressions over the buffer and write the final result to the target output row.


For our rank() function, the below expression will be evaluated.

BoundReference
    └── input[0, int, false]

It simply retrieves the integer at the 0th index from the buffer, representing the currently computed rank. Consequently, the result returned will be [1], indicating the rank for the row (1, "a1").




This concludes the full processing of the AggregateProcessor. In the next section, we will focus on the UnboundedPrecedingWindowFunctionFrame and examine how the AggregateProcessor integrates with the framework. Until then, happy learning :-)



Recent Posts

See All

Comments


Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page