From 3c3ad5f7c00f6f68bc659d4cf7020fa944b7bc69 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 28 Oct 2020 06:40:23 +0000 Subject: [PATCH] [SPARK-32934][SQL] Improve the performance for NTH_VALUE and reactor the OffsetWindowFunction ### What changes were proposed in this pull request? Spark SQL supports some window function like `NTH_VALUE`. If we specify window frame like `UNBOUNDED PRECEDING AND CURRENT ROW` or `UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`, we can elimate some calculations. For example: if we execute the SQL show below: ``` SELECT NTH_VALUE(col, 2) OVER(ORDER BY rank UNBOUNDED PRECEDING AND CURRENT ROW) FROM tab; ``` The output for row number greater than 1, return the fixed value. otherwise, return null. So we just calculate the value once and notice whether the row number less than 2. `UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING` is simpler. ### Why are the changes needed? Improve the performance for `NTH_VALUE`, `FIRST_VALUE` and `LAST_VALUE`. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Jenkins test. Closes #29800 from beliefer/optimize-nth_value. Lead-authored-by: gengjiaan Co-authored-by: beliefer Co-authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 5 +- .../expressions/windowExpressions.scala | 86 +++++++----- .../sql/execution/window/WindowExec.scala | 8 +- .../sql/execution/window/WindowExecBase.scala | 43 +++++- .../window/WindowFunctionFrame.scala | 123 ++++++++++++++++-- .../resources/sql-tests/inputs/window.sql | 30 +++++ .../sql-tests/results/window.sql.out | 98 +++++++++++++- .../sql/DataFrameWindowFunctionsSuite.scala | 17 +-- 9 files changed, 353 insertions(+), 59 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 61c077fd12aa2..c2116a2b8f471 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2978,7 +2978,7 @@ class Analyzer( */ object ResolveWindowFrame extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { - case WindowExpression(wf: OffsetWindowFunction, + case WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => failAnalysis(s"Cannot specify window frame for ${wf.prettyName} function") case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 351be32ee438e..d261f26072bcc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper { case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) => failAnalysis(s"Distinct window functions are not supported: $w") - case w @ WindowExpression(_: OffsetWindowFunction, + case w @ WindowExpression(_: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame)) if order.isEmpty || !frame.isOffset => failAnalysis("An offset window function can only be evaluated in an ordered " + @@ -176,7 +176,8 @@ trait CheckAnalysis extends PredicateHelper { // Only allow window functions with an aggregate expression or an offset window // function or a Pandas window UDF. e match { - case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction => + case _: AggregateExpression | _: FrameLessOffsetWindowFunction | + _: AggregateWindowFunction => w case f: PythonUDF if PythonUDF.isWindowPandasUDF(f) => w diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index bc0b4ac018f9e..168585dc3de00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -327,25 +327,14 @@ object WindowFunctionType { } } - -/** - * An offset window function is a window function that returns the value of the input column offset - * by a number of rows within the partition. For instance: an OffsetWindowfunction for value x with - * offset -2, will get the value of x 2 rows back in the partition. - */ -abstract class OffsetWindowFunction - extends Expression with WindowFunction with Unevaluable with ImplicitCastInputTypes { +trait OffsetWindowSpec extends Expression { /** * Input expression to evaluate against a row which a number of rows below or above (depending on - * the value and sign of the offset) the current row. + * the value and sign of the offset) the starting row (current row if isRelative=true, or the + * first row of the window frame otherwise). */ val input: Expression - /** - * Default result value for the function when the `offset`th row does not exist. - */ - val default: Expression - /** * (Foldable) expression that contains the number of rows between the current row and the row * where the input expression is evaluated. If `offset` is a positive integer, it means that @@ -355,6 +344,36 @@ abstract class OffsetWindowFunction */ val offset: Expression + /** + * Default result value for the function when the `offset`th row does not exist. + */ + val default: Expression + + /** + * An optional specification that indicates the offset window function should skip null values in + * the determination of which row to use. + */ + val ignoreNulls: Boolean + + /** + * Whether the offset is starts with the current row. If `isRelative` is true, `offset` means + * the offset is start with the current row. otherwise, the offset is starts with the first + * row of the entire window frame. + */ + val isRelative: Boolean + + lazy val fakeFrame = SpecifiedWindowFrame(RowFrame, offset, offset) +} + +/** + * A frameless offset window function is a window function that cannot specify window frame and + * returns the value of the input column offset by a number of rows within the partition. + * For instance: a FrameLessOffsetWindowFunction for value x with offset -2, will get the value of + * x 2 rows back in the partition. + */ +abstract class FrameLessOffsetWindowFunction + extends WindowFunction with OffsetWindowSpec with Unevaluable with ImplicitCastInputTypes { + override def children: Seq[Expression] = Seq(input, offset, default) /* @@ -370,7 +389,11 @@ abstract class OffsetWindowFunction override def nullable: Boolean = default == null || default.nullable || input.nullable - override lazy val frame: WindowFrame = SpecifiedWindowFrame(RowFrame, offset, offset) + override val ignoreNulls = false + + override val isRelative = true + + override lazy val frame: WindowFrame = fakeFrame override def checkInputDataTypes(): TypeCheckResult = { val check = super.checkInputDataTypes() @@ -425,7 +448,7 @@ abstract class OffsetWindowFunction group = "window_funcs") // scalastyle:on line.size.limit line.contains.tab case class Lead(input: Expression, offset: Expression, default: Expression) - extends OffsetWindowFunction { + extends FrameLessOffsetWindowFunction { def this(input: Expression, offset: Expression) = this(input, offset, Literal(null)) @@ -467,7 +490,7 @@ case class Lead(input: Expression, offset: Expression, default: Expression) group = "window_funcs") // scalastyle:on line.size.limit line.contains.tab case class Lag(input: Expression, inputOffset: Expression, default: Expression) - extends OffsetWindowFunction { + extends FrameLessOffsetWindowFunction { def this(input: Expression, offset: Expression) = this(input, offset, Literal(null)) @@ -579,7 +602,6 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { } // scalastyle:off line.size.limit line.contains.tab - @ExpressionDescription( usage = """ _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row @@ -607,12 +629,16 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { since = "3.1.0", group = "window_funcs") // scalastyle:on line.size.limit line.contains.tab -case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) - extends AggregateWindowFunction with ImplicitCastInputTypes { +case class NthValue(input: Expression, offset: Expression, ignoreNulls: Boolean) + extends AggregateWindowFunction with OffsetWindowSpec with ImplicitCastInputTypes { def this(child: Expression, offset: Expression) = this(child, offset, false) - override def children: Seq[Expression] = input :: offsetExpr :: Nil + override lazy val default = Literal.create(null, input.dataType) + + override val isRelative = false + + override def children: Seq[Expression] = input :: offset :: Nil override val frame: WindowFrame = UnspecifiedFrame @@ -624,35 +650,35 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool val check = super.checkInputDataTypes() if (check.isFailure) { check - } else if (!offsetExpr.foldable) { - TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") - } else if (offset <= 0) { + } else if (!offset.foldable) { + TypeCheckFailure(s"Offset expression '$offset' must be a literal.") + } else if (offsetVal <= 0) { TypeCheckFailure( - s"The 'offset' argument of nth_value must be greater than zero but it is $offset.") + s"The 'offset' argument of nth_value must be greater than zero but it is $offsetVal.") } else { TypeCheckSuccess } } - private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong + private lazy val offsetVal = offset.eval().asInstanceOf[Int].toLong private lazy val result = AttributeReference("result", input.dataType)() private lazy val count = AttributeReference("count", LongType)() override lazy val aggBufferAttributes: Seq[AttributeReference] = result :: count :: Nil override lazy val initialValues: Seq[Literal] = Seq( - /* result = */ Literal.create(null, input.dataType), + /* result = */ default, /* count = */ Literal(1L) ) override lazy val updateExpressions: Seq[Expression] = { if (ignoreNulls) { Seq( - /* result = */ If(count === offset && input.isNotNull, input, result), + /* result = */ If(count === offsetVal && input.isNotNull, input, result), /* count = */ If(input.isNull, count, count + 1L) ) } else { Seq( - /* result = */ If(count === offset, input, result), + /* result = */ If(count === offsetVal, input, result), /* count = */ count + 1L ) } @@ -662,7 +688,7 @@ case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Bool override def prettyName: String = "nth_value" override def sql: String = - s"$prettyName(${input.sql}, ${offsetExpr.sql})${if (ignoreNulls) " ignore nulls" else ""}" + s"$prettyName(${input.sql}, ${offset.sql})${if (ignoreNulls) " ignore nulls" else ""}" } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index eaca55df08d06..439c31a47fd3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -57,8 +57,12 @@ import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, * 3. CURRENT ROW AND 1 FOLLOWING * 4. 1 PRECEDING AND 1 FOLLOWING * 5. 1 FOLLOWING AND 2 FOLLOWING - * - Offset frame: The frame consist of one row, which is an offset number of rows away from the - * current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame. + * - Offset frame: The frame consist of one row, which is an offset number of rows. There are three + * implement of offset frame. + * 1. [[FrameLessOffsetWindowFunction]] returns the value of the input column offset by a number + * of rows according to the current row. + * 2. [[UnboundedOffsetWindowFunctionFrame]] and [[UnboundedPrecedingOffsetWindowFunctionFrame]] + * returns the value of the input column offset by a number of rows within the frame. * * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame * boundary can be either Row or Range based: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index ed055bb801ae5..f0b99c1522aa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -136,8 +136,15 @@ trait WindowExecBase extends UnaryExecNode { val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] function match { case AggregateExpression(f, _, _, _, _) => collect("AGGREGATE", frame, e, f) + case f: FrameLessOffsetWindowFunction => collect("FRAME_LESS_OFFSET", frame, e, f) + case f: OffsetWindowSpec if !f.ignoreNulls && + frame.frameType == RowFrame && frame.lower == UnboundedPreceding => + frame.upper match { + case UnboundedFollowing => collect("UNBOUNDED_OFFSET", f.fakeFrame, e, f) + case CurrentRow => collect("UNBOUNDED_PRECEDING_OFFSET", f.fakeFrame, e, f) + case _ => collect("AGGREGATE", frame, e, f) + } case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f) - case f: OffsetWindowFunction => collect("OFFSET", frame, e, f) case f: PythonUDF => collect("AGGREGATE", frame, e, f) case f => sys.error(s"Unsupported window function: $f") } @@ -171,18 +178,42 @@ trait WindowExecBase extends UnaryExecNode { // Create the factory to produce WindowFunctionFrame. val factory = key match { - // Offset Frame - case ("OFFSET", _, IntegerLiteral(offset), _) => + // Frameless offset Frame + case ("FRAME_LESS_OFFSET", _, IntegerLiteral(offset), _) => target: InternalRow => - new OffsetWindowFunctionFrame( + new FrameLessOffsetWindowFunctionFrame( target, ordinal, - // OFFSET frame functions are guaranteed be OffsetWindowFunctions. - functions.map(_.asInstanceOf[OffsetWindowFunction]), + // OFFSET frame functions are guaranteed be OffsetWindowSpec. + functions.map(_.asInstanceOf[OffsetWindowSpec]), child.output, (expressions, schema) => MutableProjection.create(expressions, schema), offset) + case ("UNBOUNDED_OFFSET", _, IntegerLiteral(offset), _) => + target: InternalRow => { + new UnboundedOffsetWindowFunctionFrame( + target, + ordinal, + // OFFSET frame functions are guaranteed be OffsetWindowSpec. + functions.map(_.asInstanceOf[OffsetWindowSpec]), + child.output, + (expressions, schema) => + MutableProjection.create(expressions, schema), + offset) + } + case ("UNBOUNDED_PRECEDING_OFFSET", _, IntegerLiteral(offset), _) => + target: InternalRow => { + new UnboundedPrecedingOffsetWindowFunctionFrame( + target, + ordinal, + // OFFSET frame functions are guaranteed be OffsetWindowSpec. + functions.map(_.asInstanceOf[OffsetWindowSpec]), + child.output, + (expressions, schema) => + MutableProjection.create(expressions, schema), + offset) + } // Entire Partition Frame. case ("AGGREGATE", _, UnboundedPreceding, UnboundedFollowing) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index dc1b919feefe4..e8a83f9772d35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -77,31 +77,31 @@ object WindowFunctionFrame { * @param newMutableProjection function used to create the projection. * @param offset by which rows get moved within a partition. */ -final class OffsetWindowFunctionFrame( +abstract class OffsetWindowFunctionFrameBase( target: InternalRow, ordinal: Int, - expressions: Array[OffsetWindowFunction], + expressions: Array[OffsetWindowSpec], inputSchema: Seq[Attribute], newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, offset: Int) extends WindowFunctionFrame { /** Rows of the partition currently being processed. */ - private[this] var input: ExternalAppendOnlyUnsafeRowArray = null + protected var input: ExternalAppendOnlyUnsafeRowArray = null /** * An iterator over the [[input]] */ - private[this] var inputIterator: Iterator[UnsafeRow] = _ + protected var inputIterator: Iterator[UnsafeRow] = _ /** Index of the input row currently used for output. */ - private[this] var inputIndex = 0 + protected var inputIndex = 0 /** * Create the projection used when the offset row exists. * Please note that this project always respect null input values (like PostgreSQL). */ - private[this] val projection = { + protected val projection = { // Collect the expressions and bind them. val inputAttrs = inputSchema.map(_.withNullability(true)) val boundExpressions = Seq.fill(ordinal)(NoOp) ++ bindReferences( @@ -112,7 +112,7 @@ final class OffsetWindowFunctionFrame( } /** Create the projection used when the offset row DOES NOT exists. */ - private[this] val fillDefaultValue = { + protected val fillDefaultValue = { // Collect the expressions and bind them. val inputAttrs: AttributeSeq = inputSchema.map(_.withNullability(true)) val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e => @@ -129,6 +129,28 @@ final class OffsetWindowFunctionFrame( newMutableProjection(boundExpressions, Nil).target(target) } + override def currentLowerBound(): Int = throw new UnsupportedOperationException() + + override def currentUpperBound(): Int = throw new UnsupportedOperationException() +} + +/** + * The frameless offset window frame is an internal window frame just used to optimize the + * performance for the window function that returns the value of the input column offset + * by a number of rows according to the current row. The internal window frame is not a popular + * window frame cannot be specified and used directly by the users. This window frame + * calculates frames containing LEAD/LAG statements. + */ +class FrameLessOffsetWindowFunctionFrame( + target: InternalRow, + ordinal: Int, + expressions: Array[OffsetWindowSpec], + inputSchema: Seq[Attribute], + newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, + offset: Int) + extends OffsetWindowFunctionFrameBase( + target, ordinal, expressions, inputSchema, newMutableProjection, offset) { + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { input = rows inputIterator = input.generateIterator() @@ -151,10 +173,93 @@ final class OffsetWindowFunctionFrame( } inputIndex += 1 } +} - override def currentLowerBound(): Int = throw new UnsupportedOperationException() +/** + * The unbounded offset window frame is an internal window frame just used to optimize the + * performance for the window function that returns the value of the input column offset + * by a number of rows within the frame and has specified ROWS BETWEEN UNBOUNDED PRECEDING + * AND UNBOUNDED FOLLOWING. The internal window frame is not a popular window frame cannot be + * specified and used directly by the users. + * The unbounded offset window frame calculates frames containing NTH_VALUE statements. + * The unbounded offset window frame return the same value for all rows in the window partition. + */ +class UnboundedOffsetWindowFunctionFrame( + target: InternalRow, + ordinal: Int, + expressions: Array[OffsetWindowSpec], + inputSchema: Seq[Attribute], + newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, + offset: Int) + extends OffsetWindowFunctionFrameBase( + target, ordinal, expressions, inputSchema, newMutableProjection, offset) { - override def currentUpperBound(): Int = throw new UnsupportedOperationException() + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { + input = rows + if (offset > input.length) { + fillDefaultValue(EmptyRow) + } else { + inputIterator = input.generateIterator() + // drain the first few rows if offset is larger than one + inputIndex = 0 + while (inputIndex < offset - 1) { + if (inputIterator.hasNext) inputIterator.next() + inputIndex += 1 + } + val r = WindowFunctionFrame.getNextOrNull(inputIterator) + projection(r) + } + } + + override def write(index: Int, current: InternalRow): Unit = { + // The results are the same for each row in the partition, and have been evaluated in prepare. + // Don't need to recalculate here. + } +} + +/** + * The unbounded preceding offset window frame is an internal window frame just used to optimize + * the performance for the window function that returns the value of the input column offset + * by a number of rows within the frame and has specified ROWS BETWEEN UNBOUNDED PRECEDING + * AND CURRENT ROW. The internal window frame is not a popular window frame cannot be specified + * and used directly by the users. + * The unbounded preceding offset window frame calculates frames containing NTH_VALUE statements. + * The unbounded preceding offset window frame return the same value for rows which index + * (starting from 1) equal to or greater than offset in the window partition. + */ +class UnboundedPrecedingOffsetWindowFunctionFrame( + target: InternalRow, + ordinal: Int, + expressions: Array[OffsetWindowSpec], + inputSchema: Seq[Attribute], + newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection, + offset: Int) + extends OffsetWindowFunctionFrameBase( + target, ordinal, expressions, inputSchema, newMutableProjection, offset) { + + var selectedRow: UnsafeRow = null + + override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { + input = rows + inputIterator = input.generateIterator() + // drain the first few rows if offset is larger than one + inputIndex = 0 + while (inputIndex < offset - 1) { + if (inputIterator.hasNext) inputIterator.next() + inputIndex += 1 + } + if (inputIndex < input.length) { + selectedRow = WindowFunctionFrame.getNextOrNull(inputIterator) + } + } + + override def write(index: Int, current: InternalRow): Unit = { + if (index >= inputIndex && selectedRow != null) { + projection(selectedRow) + } else { + fillDefaultValue(EmptyRow) + } + } } /** diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index 5de6db210ce36..c1be5fb27e6fa 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -165,6 +165,16 @@ FROM basic_pays ORDER BY salary DESC; +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC; + SELECT employee_name, salary, @@ -205,6 +215,26 @@ FROM basic_pays ORDER BY salary DESC; +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC; + +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC; + SELECT employee_name, department, diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index 028dd7a12d25d..f6506a77e239c 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 32 +-- Number of queries: 35 -- !query @@ -479,6 +479,38 @@ Anthony Bow 6627 Gerard Bondur Leslie Thompson 5186 Gerard Bondur +-- !query +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC +-- !query schema +struct +-- !query output +Larry Bott 11798 NULL +Gerard Bondur 11472 Gerard Bondur +Pamela Castillo 11303 Gerard Bondur +Barry Jones 10586 Gerard Bondur +George Vanauf 10563 Gerard Bondur +Loui Bondur 10449 Gerard Bondur +Mary Patterson 9998 Gerard Bondur +Steve Patterson 9441 Gerard Bondur +Julie Firrelli 9181 Gerard Bondur +Jeff Firrelli 8992 Gerard Bondur +William Patterson 8870 Gerard Bondur +Diane Murphy 8435 Gerard Bondur +Leslie Jennings 8113 Gerard Bondur +Gerard Hernandez 6949 Gerard Bondur +Foon Yue Tseng 6660 Gerard Bondur +Anthony Bow 6627 Gerard Bondur +Leslie Thompson 5186 Gerard Bondur + + -- !query SELECT employee_name, @@ -607,6 +639,70 @@ Anthony Bow 6627 Gerard Bondur Leslie Thompson 5186 Gerard Bondur +-- !query +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC +-- !query schema +struct +-- !query output +Larry Bott 11798 Gerard Bondur +Gerard Bondur 11472 Gerard Bondur +Pamela Castillo 11303 Gerard Bondur +Barry Jones 10586 Gerard Bondur +George Vanauf 10563 Gerard Bondur +Loui Bondur 10449 Gerard Bondur +Mary Patterson 9998 Gerard Bondur +Steve Patterson 9441 Gerard Bondur +Julie Firrelli 9181 Gerard Bondur +Jeff Firrelli 8992 Gerard Bondur +William Patterson 8870 Gerard Bondur +Diane Murphy 8435 Gerard Bondur +Leslie Jennings 8113 Gerard Bondur +Gerard Hernandez 6949 Gerard Bondur +Foon Yue Tseng 6660 Gerard Bondur +Anthony Bow 6627 Gerard Bondur +Leslie Thompson 5186 Gerard Bondur + + +-- !query +SELECT + employee_name, + salary, + nth_value(employee_name, 2) OVER ( + ORDER BY salary DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) second_highest_salary +FROM + basic_pays +ORDER BY salary DESC +-- !query schema +struct +-- !query output +Larry Bott 11798 Gerard Bondur +Gerard Bondur 11472 Gerard Bondur +Pamela Castillo 11303 Gerard Bondur +Barry Jones 10586 Gerard Bondur +George Vanauf 10563 Gerard Bondur +Loui Bondur 10449 Gerard Bondur +Mary Patterson 9998 Gerard Bondur +Steve Patterson 9441 Gerard Bondur +Julie Firrelli 9181 Gerard Bondur +Jeff Firrelli 8992 Gerard Bondur +William Patterson 8870 Gerard Bondur +Diane Murphy 8435 Gerard Bondur +Leslie Jennings 8113 Gerard Bondur +Gerard Hernandez 6949 Gerard Bondur +Foon Yue Tseng 6660 Gerard Bondur +Anthony Bow 6627 Gerard Bondur +Leslie Thompson 5186 Gerard Bondur + + -- !query SELECT employee_name, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 616e333033aa9..207b2963f0b3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -657,15 +657,16 @@ class DataFrameWindowFunctionsSuite extends QueryTest $"order", nth_value($"value", 2).over(window), nth_value($"value", 2, ignoreNulls = false).over(window), - nth_value($"value", 2, ignoreNulls = true).over(window)), + nth_value($"value", 2, ignoreNulls = true).over(window), + nth_value($"value", 3, ignoreNulls = false).over(window)), Seq( - Row("a", 0, null, null, null), - Row("a", 1, "x", "x", null), - Row("a", 2, "x", "x", "y"), - Row("a", 3, "x", "x", "y"), - Row("a", 4, "x", "x", "y"), - Row("b", 1, null, null, null), - Row("b", 2, null, null, null))) + Row("a", 0, null, null, null, null), + Row("a", 1, "x", "x", null, null), + Row("a", 2, "x", "x", "y", "y"), + Row("a", 3, "x", "x", "y", "y"), + Row("a", 4, "x", "x", "y", "y"), + Row("b", 1, null, null, null, null), + Row("b", 2, null, null, null, null))) } test("nth_value on descending ordered window") {