Skip to content

Commit 3e58c46

Browse files
author
Альберт Скальт
committed
push down filter: extend a projection if some pushed filters become unsupported
Consider the next scenario: 1. `supports_filters_pushdown` returns `Exact` on some filter, e.g. "a = 1", where column "a" is not required by the query projection. 2. "a" is removed from the table provider projection by "optimize projection" rule. 3. `supports_filters_pushdown` changes a decision and returns `Inexact` on this filter the next time. For example, input filters were changed and it prefers to use a new one. 4. "a" is not returned to the table provider projection which leads to filter that references a column which is not a part of the schema. This patch fixes this issue introducing the next logic within a filter push-down rule: 1. Collect columns that are not used in the current table provider projection, but required for filter expressions. Call it `additional_projection`. 2. If `additional_projection` is empty -- leave all as it was before. 3. Otherwise extend a table provider projection and wrap a plan with an additional projection node to preserve schema used prior to this rule.
1 parent 6162c56 commit 3e58c46

File tree

2 files changed

+129
-22
lines changed

2 files changed

+129
-22
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,13 @@ use datafusion_common::{
2828
JoinConstraint, Result,
2929
};
3030
use datafusion_expr::expr_rewriter::replace_col;
31-
use datafusion_expr::logical_plan::{
32-
CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
33-
};
31+
use datafusion_expr::logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, Union};
3432
use datafusion_expr::utils::{
3533
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
3634
};
3735
use datafusion_expr::{
3836
and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, Operator,
39-
Projection, TableProviderFilterPushDown,
37+
Projection, TableProviderFilterPushDown, TableScan,
4038
};
4139

4240
use crate::optimizer::ApplyOrder;
@@ -897,23 +895,106 @@ impl OptimizerRule for PushDownFilter {
897895
.map(|(pred, _)| pred);
898896
let new_scan_filters: Vec<Expr> =
899897
new_scan_filters.unique().cloned().collect();
898+
899+
let source_schema = scan.source.schema();
900+
let mut additional_projection = HashSet::new();
900901
let new_predicate: Vec<Expr> = zip
901-
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
902+
.filter(|(expr, res)| {
903+
if *res == TableProviderFilterPushDown::Exact {
904+
return false;
905+
}
906+
expr.apply(|expr| {
907+
if let Expr::Column(column) = expr {
908+
if let Ok(idx) = source_schema.index_of(column.name()) {
909+
if scan
910+
.projection
911+
.as_ref()
912+
.is_some_and(|p| !p.contains(&idx))
913+
{
914+
additional_projection.insert(idx);
915+
}
916+
}
917+
}
918+
Ok(TreeNodeRecursion::Continue)
919+
})
920+
.unwrap();
921+
true
922+
})
902923
.map(|(pred, _)| pred.clone())
903924
.collect();
904925

905-
let new_scan = LogicalPlan::TableScan(TableScan {
906-
filters: new_scan_filters,
907-
..scan
908-
});
926+
let scan_source = Arc::clone(&scan.source);
927+
let scan_table_name = scan.table_name.clone();
909928

910-
Transformed::yes(new_scan).transform_data(|new_scan| {
911-
if let Some(predicate) = conjunction(new_predicate) {
912-
make_filter(predicate, Arc::new(new_scan)).map(Transformed::yes)
929+
// Wraps with a filter if some filters are not supported exactly.
930+
let filtered = move |plan| {
931+
if let Some(new_predicate) = conjunction(new_predicate) {
932+
Filter::try_new(new_predicate, Arc::new(plan))
933+
.map(LogicalPlan::Filter)
913934
} else {
914-
Ok(Transformed::no(new_scan))
935+
Ok(plan)
915936
}
916-
})
937+
};
938+
939+
if additional_projection.is_empty() {
940+
// No additional projection is required.
941+
let new_scan = LogicalPlan::TableScan(TableScan {
942+
filters: new_scan_filters,
943+
..scan
944+
});
945+
return filtered(new_scan).map(Transformed::yes);
946+
}
947+
948+
let new_scan = filtered(
949+
LogicalPlanBuilder::scan_with_filters_fetch(
950+
scan_table_name.clone(),
951+
scan.source,
952+
scan.projection.clone().map(|mut projection| {
953+
// Extend a projection.
954+
projection.extend(additional_projection);
955+
projection
956+
}),
957+
new_scan_filters,
958+
scan.fetch,
959+
)?
960+
.build()?,
961+
)?;
962+
963+
// Project fields required by the initial projection.
964+
let source_schema = scan_source.schema();
965+
let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
966+
scan.projection
967+
.as_ref()
968+
.map(|projection| {
969+
projection
970+
.into_iter()
971+
.cloned()
972+
.map(|idx| {
973+
Expr::Column(Column::new(
974+
Some(scan_table_name.clone()),
975+
source_schema.field(idx).name(),
976+
))
977+
})
978+
.collect()
979+
})
980+
.unwrap_or_else(|| {
981+
source_schema
982+
.fields()
983+
.iter()
984+
.map(|field| {
985+
Expr::Column(Column::new(
986+
Some(scan_table_name.clone()),
987+
field.name(),
988+
))
989+
})
990+
.collect()
991+
}),
992+
Arc::new(new_scan),
993+
// Preserve a projected schema.
994+
scan.projected_schema,
995+
)?);
996+
997+
Ok(Transformed::yes(new_plan))
917998
}
918999
LogicalPlan::Extension(extension_plan) => {
9191000
let prevent_cols =
@@ -1206,8 +1287,8 @@ mod tests {
12061287
use datafusion_expr::logical_plan::table_scan;
12071288
use datafusion_expr::{
12081289
col, in_list, in_subquery, lit, ColumnarValue, Extension, ScalarUDF,
1209-
ScalarUDFImpl, Signature, TableSource, TableType, UserDefinedLogicalNodeCore,
1210-
Volatility,
1290+
ScalarUDFImpl, Signature, TableScan, TableSource, TableType,
1291+
UserDefinedLogicalNodeCore, Volatility,
12111292
};
12121293

12131294
use crate::optimizer::Optimizer;
@@ -2452,6 +2533,33 @@ mod tests {
24522533
.build()
24532534
}
24542535

2536+
#[test]
2537+
fn projection_is_updated_when_filter_becomes_unsupported() -> Result<()> {
2538+
let test_provider = PushDownProvider {
2539+
filter_support: TableProviderFilterPushDown::Unsupported,
2540+
};
2541+
2542+
let projeted_schema = test_provider.schema().project(&[0])?;
2543+
let table_scan = LogicalPlan::TableScan(TableScan {
2544+
table_name: "test".into(),
2545+
// Emulate that there were pushed filters but now
2546+
// provider cannot support it.
2547+
filters: vec![col("b").eq(lit(1i64))],
2548+
projected_schema: Arc::new(DFSchema::try_from(projeted_schema)?),
2549+
projection: Some(vec![0]),
2550+
source: Arc::new(test_provider),
2551+
fetch: None,
2552+
});
2553+
2554+
let plan = LogicalPlanBuilder::from(table_scan)
2555+
.filter(col("a").eq(lit(1i64)))?
2556+
.build()?;
2557+
2558+
let expected = "\
2559+
Projection: test.a\n Filter: a = Int64(1) AND b = Int64(1)\n TableScan: test projection=[a, b]";
2560+
assert_optimized_plan_eq(plan, expected)
2561+
}
2562+
24552563
#[test]
24562564
fn filter_with_table_provider_exact() -> Result<()> {
24572565
let plan = table_scan_with_pushdown_provider(TableProviderFilterPushDown::Exact)?;
@@ -2514,7 +2622,7 @@ mod tests {
25142622
projected_schema: Arc::new(DFSchema::try_from(
25152623
(*test_provider.schema()).clone(),
25162624
)?),
2517-
projection: Some(vec![0]),
2625+
projection: Some(vec![0, 1]),
25182626
source: Arc::new(test_provider),
25192627
fetch: None,
25202628
});

datafusion/sql/src/parser.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -279,15 +279,14 @@ impl<'a> DFParser<'a> {
279279
sql: &str,
280280
dialect: &'a dyn Dialect,
281281
) -> Result<Self, ParserError> {
282-
let tokens = Tokenizer::new(dialect, sql).into_tokens().collect::<Result<_, _>>()?;
282+
let tokens = Tokenizer::new(dialect, sql)
283+
.into_tokens()
284+
.collect::<Result<_, _>>()?;
283285
Ok(Self::from_dialect_and_tokens(dialect, tokens))
284286
}
285287

286288
/// Create a new parser from specified dialect and tokens.
287-
pub fn from_dialect_and_tokens(
288-
dialect: &'a dyn Dialect,
289-
tokens: Vec<Token>,
290-
) -> Self {
289+
pub fn from_dialect_and_tokens(dialect: &'a dyn Dialect, tokens: Vec<Token>) -> Self {
291290
let parser = Parser::new(dialect).with_tokens(tokens);
292291
DFParser { parser }
293292
}

0 commit comments

Comments
 (0)