Skip to content

Commit

Permalink
[SPARK-32934][SQL] Improve the performance for NTH_VALUE and reactor …
Browse files Browse the repository at this point in the history
…the OffsetWindowFunction

### What changes were proposed in this pull request?
Spark SQL supports some window function like `NTH_VALUE`.
If we specify window frame like `UNBOUNDED PRECEDING AND CURRENT ROW` or `UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`, we can elimate some calculations.
For example: if we execute the SQL show below:
```
SELECT NTH_VALUE(col,
         2) OVER(ORDER BY rank UNBOUNDED PRECEDING
        AND CURRENT ROW)
FROM tab;
```
The output for row number greater than 1, return the fixed value. otherwise, return null. So we just calculate the value once and notice whether the row number less than 2.
`UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING` is simpler.

### Why are the changes needed?
Improve the performance for `NTH_VALUE`, `FIRST_VALUE` and `LAST_VALUE`.

### Does this PR introduce _any_ user-facing change?
 'No'.

### How was this patch tested?
Jenkins test.

Closes apache#29800 from beliefer/optimize-nth_value.

Lead-authored-by: gengjiaan <[email protected]>
Co-authored-by: beliefer <[email protected]>
Co-authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
2 people authored and cloud-fan committed Oct 28, 2020
1 parent 9fb4536 commit 3c3ad5f
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2978,7 +2978,7 @@ class Analyzer(
*/
object ResolveWindowFrame extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
case WindowExpression(wf: OffsetWindowFunction,
case WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f =>
failAnalysis(s"Cannot specify window frame for ${wf.prettyName} function")
case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper {
case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")

case w @ WindowExpression(_: OffsetWindowFunction,
case w @ WindowExpression(_: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
if order.isEmpty || !frame.isOffset =>
failAnalysis("An offset window function can only be evaluated in an ordered " +
Expand All @@ -176,7 +176,8 @@ trait CheckAnalysis extends PredicateHelper {
// Only allow window functions with an aggregate expression or an offset window
// function or a Pandas window UDF.
e match {
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
_: AggregateWindowFunction =>
w
case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) =>
w
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,25 +327,14 @@ object WindowFunctionType {
}
}


/**
* An offset window function is a window function that returns the value of the input column offset
* by a number of rows within the partition. For instance: an OffsetWindowfunction for value x with
* offset -2, will get the value of x 2 rows back in the partition.
*/
abstract class OffsetWindowFunction
extends Expression with WindowFunction with Unevaluable with ImplicitCastInputTypes {
trait OffsetWindowSpec extends Expression {
/**
* Input expression to evaluate against a row which a number of rows below or above (depending on
* the value and sign of the offset) the current row.
* the value and sign of the offset) the starting row (current row if isRelative=true, or the
* first row of the window frame otherwise).
*/
val input: Expression

/**
* Default result value for the function when the `offset`th row does not exist.
*/
val default: Expression

/**
* (Foldable) expression that contains the number of rows between the current row and the row
* where the input expression is evaluated. If `offset` is a positive integer, it means that
Expand All @@ -355,6 +344,36 @@ abstract class OffsetWindowFunction
*/
val offset: Expression

/**
* Default result value for the function when the `offset`th row does not exist.
*/
val default: Expression

/**
* An optional specification that indicates the offset window function should skip null values in
* the determination of which row to use.
*/
val ignoreNulls: Boolean

/**
* Whether the offset is starts with the current row. If `isRelative` is true, `offset` means
* the offset is start with the current row. otherwise, the offset is starts with the first
* row of the entire window frame.
*/
val isRelative: Boolean

lazy val fakeFrame = SpecifiedWindowFrame(RowFrame, offset, offset)
}

/**
* A frameless offset window function is a window function that cannot specify window frame and
* returns the value of the input column offset by a number of rows within the partition.
* For instance: a FrameLessOffsetWindowFunction for value x with offset -2, will get the value of
* x 2 rows back in the partition.
*/
abstract class FrameLessOffsetWindowFunction
extends WindowFunction with OffsetWindowSpec with Unevaluable with ImplicitCastInputTypes {

override def children: Seq[Expression] = Seq(input, offset, default)

/*
Expand All @@ -370,7 +389,11 @@ abstract class OffsetWindowFunction

override def nullable: Boolean = default == null || default.nullable || input.nullable

override lazy val frame: WindowFrame = SpecifiedWindowFrame(RowFrame, offset, offset)
override val ignoreNulls = false

override val isRelative = true

override lazy val frame: WindowFrame = fakeFrame

override def checkInputDataTypes(): TypeCheckResult = {
val check = super.checkInputDataTypes()
Expand Down Expand Up @@ -425,7 +448,7 @@ abstract class OffsetWindowFunction
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class Lead(input: Expression, offset: Expression, default: Expression)
extends OffsetWindowFunction {
extends FrameLessOffsetWindowFunction {

def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))

Expand Down Expand Up @@ -467,7 +490,7 @@ case class Lead(input: Expression, offset: Expression, default: Expression)
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class Lag(input: Expression, inputOffset: Expression, default: Expression)
extends OffsetWindowFunction {
extends FrameLessOffsetWindowFunction {

def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))

Expand Down Expand Up @@ -579,7 +602,6 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
}

// scalastyle:off line.size.limit line.contains.tab

@ExpressionDescription(
usage = """
_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
Expand Down Expand Up @@ -607,12 +629,16 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
since = "3.1.0",
group = "window_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)
extends AggregateWindowFunction with ImplicitCastInputTypes {
case class NthValue(input: Expression, offset: Expression, ignoreNulls: Boolean)
extends AggregateWindowFunction with OffsetWindowSpec with ImplicitCastInputTypes {

def this(child: Expression, offset: Expression) = this(child, offset, false)

override def children: Seq[Expression] = input :: offsetExpr :: Nil
override lazy val default = Literal.create(null, input.dataType)

override val isRelative = false

override def children: Seq[Expression] = input :: offset :: Nil

override val frame: WindowFrame = UnspecifiedFrame

Expand All @@ -624,35 +650,35 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool
val check = super.checkInputDataTypes()
if (check.isFailure) {
check
} else if (!offsetExpr.foldable) {
TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
} else if (offset <= 0) {
} else if (!offset.foldable) {
TypeCheckFailure(s"Offset expression '$offset' must be a literal.")
} else if (offsetVal <= 0) {
TypeCheckFailure(
s"The 'offset' argument of nth_value must be greater than zero but it is $offset.")
s"The 'offset' argument of nth_value must be greater than zero but it is $offsetVal.")
} else {
TypeCheckSuccess
}
}

private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
private lazy val offsetVal = offset.eval().asInstanceOf[Int].toLong
private lazy val result = AttributeReference("result", input.dataType)()
private lazy val count = AttributeReference("count", LongType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = result :: count :: Nil

override lazy val initialValues: Seq[Literal] = Seq(
/* result = */ Literal.create(null, input.dataType),
/* result = */ default,
/* count = */ Literal(1L)
)

override lazy val updateExpressions: Seq[Expression] = {
if (ignoreNulls) {
Seq(
/* result = */ If(count === offset && input.isNotNull, input, result),
/* result = */ If(count === offsetVal && input.isNotNull, input, result),
/* count = */ If(input.isNull, count, count + 1L)
)
} else {
Seq(
/* result = */ If(count === offset, input, result),
/* result = */ If(count === offsetVal, input, result),
/* count = */ count + 1L
)
}
Expand All @@ -662,7 +688,7 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool

override def prettyName: String = "nth_value"
override def sql: String =
s"$prettyName(${input.sql}, ${offsetExpr.sql})${if (ignoreNulls) " ignore nulls" else ""}"
s"$prettyName(${input.sql}, ${offset.sql})${if (ignoreNulls) " ignore nulls" else ""}"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType,
* 3. CURRENT ROW AND 1 FOLLOWING
* 4. 1 PRECEDING AND 1 FOLLOWING
* 5. 1 FOLLOWING AND 2 FOLLOWING
* - Offset frame: The frame consist of one row, which is an offset number of rows away from the
* current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame.
* - Offset frame: The frame consist of one row, which is an offset number of rows. There are three
* implement of offset frame.
* 1. [[FrameLessOffsetWindowFunction]] returns the value of the input column offset by a number
* of rows according to the current row.
* 2. [[UnboundedOffsetWindowFunctionFrame]] and [[UnboundedPrecedingOffsetWindowFunctionFrame]]
* returns the value of the input column offset by a number of rows within the frame.
*
* Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
* boundary can be either Row or Range based:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,15 @@ trait WindowExecBase extends UnaryExecNode {
val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
function match {
case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f)
case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", frame, e, f)
case f: OffsetWindowSpec if !f.ignoreNulls &&
frame.frameType == RowFrame && frame.lower == UnboundedPreceding =>
frame.upper match {
case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f)
case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f)
case _ => collect("AGGREGATE", frame, e, f)
}
case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
case f: PythonUDF => collect("AGGREGATE", frame, e, f)
case f => sys.error(s"Unsupported window function: $f")
}
Expand Down Expand Up @@ -171,18 +178,42 @@ trait WindowExecBase extends UnaryExecNode {

// Create the factory to produce WindowFunctionFrame.
val factory = key match {
// Offset Frame
case ("OFFSET", _, IntegerLiteral(offset), _) =>
// Frameless offset Frame
case ("FRAME_LESS_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow =>
new OffsetWindowFunctionFrame(
new FrameLessOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowFunctions.
functions.map(_.asInstanceOf[OffsetWindowFunction]),
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
case ("UNBOUNDED_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow => {
new UnboundedOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
}
case ("UNBOUNDED_PRECEDING_OFFSET", _, IntegerLiteral(offset), _) =>
target: InternalRow => {
new UnboundedPrecedingOffsetWindowFunctionFrame(
target,
ordinal,
// OFFSET frame functions are guaranteed be OffsetWindowSpec.
functions.map(_.asInstanceOf[OffsetWindowSpec]),
child.output,
(expressions, schema) =>
MutableProjection.create(expressions, schema),
offset)
}

// Entire Partition Frame.
case ("AGGREGATE", _, UnboundedPreceding, UnboundedFollowing) =>
Expand Down
Loading

0 comments on commit 3c3ad5f

Please sign in to comment.