Skip to content

Commit 3ecc7d7

Browse files
chenkovskyNirnay Roy
authored and
Nirnay Roy
committed
fix: nested window function (apache#15033)
* fix: nested window function * prevent stackoverflow * Update select.rs * Update sqllogictests.rs * Update sql_api.rs * Update select.rs * Update sqllogictests.rs * update slt * update slt * clippy
1 parent 19ac784 commit 3ecc7d7

File tree

3 files changed

+68
-22
lines changed

3 files changed

+68
-22
lines changed

datafusion/core/tests/sql/sql_api.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@ use datafusion::prelude::*;
1919

2020
use tempfile::TempDir;
2121

22+
#[tokio::test]
23+
async fn test_window_function() {
24+
let ctx = SessionContext::new();
25+
let df = ctx
26+
.sql(
27+
r#"SELECT
28+
t1.v1,
29+
SUM(t1.v1) OVER w + 1
30+
FROM
31+
generate_series(1, 10000) AS t1(v1)
32+
WINDOW
33+
w AS (ORDER BY t1.v1 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);"#,
34+
)
35+
.await;
36+
assert!(df.is_ok());
37+
}
38+
2239
#[tokio::test]
2340
async fn unsupported_ddl_returns_error() {
2441
// Verify SessionContext::with_sql_options errors appropriately

datafusion/sql/src/select.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use std::collections::HashSet;
19+
use std::ops::ControlFlow;
1920
use std::sync::Arc;
2021

2122
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
@@ -28,7 +29,7 @@ use crate::utils::{
2829

2930
use datafusion_common::error::DataFusionErrorBuilder;
3031
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
31-
use datafusion_common::{not_impl_err, plan_err, Result};
32+
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
3233
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
3334
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
3435
use datafusion_expr::expr_rewriter::{
@@ -45,8 +46,8 @@ use datafusion_expr::{
4546

4647
use indexmap::IndexMap;
4748
use sqlparser::ast::{
48-
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderBy,
49-
SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
49+
visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr,
50+
OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
5051
};
5152
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};
5253

@@ -891,29 +892,42 @@ fn match_window_definitions(
891892
named_windows: &[NamedWindowDefinition],
892893
) -> Result<()> {
893894
for proj in projection.iter_mut() {
894-
if let SelectItem::ExprWithAlias {
895-
expr: SQLExpr::Function(f),
896-
alias: _,
897-
}
898-
| SelectItem::UnnamedExpr(SQLExpr::Function(f)) = proj
895+
if let SelectItem::ExprWithAlias { expr, alias: _ }
896+
| SelectItem::UnnamedExpr(expr) = proj
899897
{
900-
for NamedWindowDefinition(window_ident, window_expr) in named_windows.iter() {
901-
if let Some(WindowType::NamedWindow(ident)) = &f.over {
902-
if ident.eq(window_ident) {
903-
f.over = Some(match window_expr {
904-
NamedWindowExpr::NamedWindow(ident) => {
905-
WindowType::NamedWindow(ident.clone())
906-
}
907-
NamedWindowExpr::WindowSpec(spec) => {
908-
WindowType::WindowSpec(spec.clone())
898+
let mut err = None;
899+
visit_expressions_mut(expr, |expr| {
900+
if let SQLExpr::Function(f) = expr {
901+
if let Some(WindowType::NamedWindow(_)) = &f.over {
902+
for NamedWindowDefinition(window_ident, window_expr) in
903+
named_windows
904+
{
905+
if let Some(WindowType::NamedWindow(ident)) = &f.over {
906+
if ident.eq(window_ident) {
907+
f.over = Some(match window_expr {
908+
NamedWindowExpr::NamedWindow(ident) => {
909+
WindowType::NamedWindow(ident.clone())
910+
}
911+
NamedWindowExpr::WindowSpec(spec) => {
912+
WindowType::WindowSpec(spec.clone())
913+
}
914+
})
915+
}
909916
}
910-
})
917+
}
918+
// All named windows must be defined with a WindowSpec.
919+
if let Some(WindowType::NamedWindow(ident)) = &f.over {
920+
err = Some(DataFusionError::Plan(format!(
921+
"The window {ident} is not defined!"
922+
)));
923+
return ControlFlow::Break(());
924+
}
911925
}
912926
}
913-
}
914-
// All named windows must be defined with a WindowSpec.
915-
if let Some(WindowType::NamedWindow(ident)) = &f.over {
916-
return plan_err!("The window {ident} is not defined!");
927+
ControlFlow::Continue(())
928+
});
929+
if let Some(err) = err {
930+
return Err(err);
917931
}
918932
}
919933
}

datafusion/sqllogictest/test_files/window.slt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5537,6 +5537,21 @@ physical_plan
55375537
02)--WindowAggExec: wdw=[max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "max(aggregate_test_100_ordered.c5) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }]
55385538
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c5], file_type=csv, has_header=true
55395539

5540+
query II rowsort
5541+
SELECT
5542+
t1.v1,
5543+
SUM(t1.v1) OVER w + 1
5544+
FROM
5545+
generate_series(1, 5) AS t1(v1)
5546+
WINDOW
5547+
w AS (ORDER BY t1.v1);
5548+
----
5549+
1 2
5550+
2 4
5551+
3 7
5552+
4 11
5553+
5 16
5554+
55405555
# Testing Utf8View with window
55415556
statement ok
55425557
CREATE TABLE aggregate_test_100_utf8view AS SELECT

0 commit comments

Comments
 (0)