Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,11 @@ config_namespace! {
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
pub enable_dynamic_filter_pushdown: bool, default = true

/// When set to true, the pluggable `ExpressionAnalyzerRegistry` from
/// `SessionState` is used for expression-level statistics estimation
/// (NDV, selectivity, min/max, null fraction) in physical plan operators.
pub use_expression_analyzer: bool, default = false

/// When set to true, the optimizer will insert filters before a join between
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
Expand Down Expand Up @@ -1245,9 +1250,11 @@ config_namespace! {
pub join_reordering: bool, default = true

/// When set to true, the physical plan optimizer uses the pluggable
/// `StatisticsRegistry` for statistics propagation across operators.
/// This enables more accurate cardinality estimates compared to each
/// operator's built-in `partition_statistics`.
/// `StatisticsRegistry` for a bottom-up statistics walk across operators,
/// enabling more accurate cardinality estimates. Enabling
/// `use_expression_analyzer` alongside this flag gives built-in
/// providers access to custom expression-level analyzers (NDV,
/// selectivity) for the operators they process.
pub use_statistics_registry: bool, default = false

/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
Expand Down
45 changes: 45 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::PhysicalOptimizerContext;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -167,6 +168,8 @@ pub struct SessionState {
extension_types: ExtensionTypeRegistryRef,
/// Deserializer registry for extensions.
serializer_registry: Arc<dyn SerializerRegistry>,
/// Registry for expression-level statistics analyzers (NDV, selectivity, etc.)
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
/// Holds registered external FileFormat implementations
file_formats: HashMap<String, Arc<dyn FileFormatFactory>>,
/// Session configuration
Expand Down Expand Up @@ -212,6 +215,13 @@ impl PhysicalOptimizerContext for SessionState {
fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
self.statistics_registry.as_ref()
}

fn expression_analyzer_registry(&self) -> Option<&Arc<ExpressionAnalyzerRegistry>> {
self.config_options()
.optimizer
.use_expression_analyzer
.then_some(&self.expression_analyzer_registry)
}
}

impl Debug for SessionState {
Expand All @@ -225,6 +235,10 @@ impl Debug for SessionState {
.field("runtime_env", &self.runtime_env)
.field("catalog_list", &self.catalog_list)
.field("serializer_registry", &self.serializer_registry)
.field(
"expression_analyzer_registry",
&self.expression_analyzer_registry,
)
.field("file_formats", &self.file_formats)
.field("execution_props", &self.execution_props)
.field("table_options", &self.table_options)
Expand Down Expand Up @@ -944,6 +958,11 @@ impl SessionState {
&self.serializer_registry
}

/// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics
pub fn expression_analyzer_registry(&self) -> &Arc<ExpressionAnalyzerRegistry> {
&self.expression_analyzer_registry
}

/// Return version of the cargo package that produced this query
pub fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
Expand Down Expand Up @@ -1024,6 +1043,7 @@ pub struct SessionStateBuilder {
window_functions: Option<Vec<Arc<WindowUDF>>>,
extension_types: Option<ExtensionTypeRegistryRef>,
serializer_registry: Option<Arc<dyn SerializerRegistry>>,
expression_analyzer_registry: Option<Arc<ExpressionAnalyzerRegistry>>,
file_formats: Option<Vec<Arc<dyn FileFormatFactory>>>,
config: Option<SessionConfig>,
table_options: Option<TableOptions>,
Expand Down Expand Up @@ -1066,6 +1086,7 @@ impl SessionStateBuilder {
window_functions: None,
extension_types: None,
serializer_registry: None,
expression_analyzer_registry: None,
file_formats: None,
table_options: None,
config: None,
Expand Down Expand Up @@ -1123,6 +1144,7 @@ impl SessionStateBuilder {
window_functions: Some(existing.window_functions.into_values().collect_vec()),
extension_types: Some(existing.extension_types),
serializer_registry: Some(existing.serializer_registry),
expression_analyzer_registry: Some(existing.expression_analyzer_registry),
file_formats: Some(existing.file_formats.into_values().collect_vec()),
config: Some(new_config),
table_options: Some(existing.table_options),
Expand Down Expand Up @@ -1381,6 +1403,15 @@ impl SessionStateBuilder {
self
}

/// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics
pub fn with_expression_analyzer_registry(
mut self,
expression_analyzer_registry: Arc<ExpressionAnalyzerRegistry>,
) -> Self {
self.expression_analyzer_registry = Some(expression_analyzer_registry);
self
}

/// Set the map of [`FileFormatFactory`]s
pub fn with_file_formats(
mut self,
Expand Down Expand Up @@ -1522,6 +1553,7 @@ impl SessionStateBuilder {
window_functions,
extension_types,
serializer_registry,
expression_analyzer_registry,
file_formats,
table_options,
config,
Expand Down Expand Up @@ -1561,6 +1593,8 @@ impl SessionStateBuilder {
extension_types: Arc::new(MemoryExtensionTypeRegistry::default()),
serializer_registry: serializer_registry
.unwrap_or_else(|| Arc::new(EmptySerializerRegistry)),
expression_analyzer_registry: expression_analyzer_registry
.unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())),
file_formats: HashMap::new(),
table_options: table_options.unwrap_or_else(|| {
TableOptions::default_from_session_config(config.options())
Expand Down Expand Up @@ -1748,6 +1782,13 @@ impl SessionStateBuilder {
&mut self.serializer_registry
}

/// Returns the current expression_analyzer_registry value
pub fn expression_analyzer_registry(
&mut self,
) -> &mut Option<Arc<ExpressionAnalyzerRegistry>> {
&mut self.expression_analyzer_registry
}

/// Returns the current file_formats value
pub fn file_formats(&mut self) -> &mut Option<Vec<Arc<dyn FileFormatFactory>>> {
&mut self.file_formats
Expand Down Expand Up @@ -1823,6 +1864,10 @@ impl Debug for SessionStateBuilder {
.field("runtime_env", &self.runtime_env)
.field("catalog_list", &self.catalog_list)
.field("serializer_registry", &self.serializer_registry)
.field(
"expression_analyzer_registry",
&self.expression_analyzer_registry,
)
.field("file_formats", &self.file_formats)
.field("execution_props", &self.execution_props)
.field("table_options", &self.table_options)
Expand Down
52 changes: 44 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::physical_plan::{
WindowExpr, displayable, windows,
};
use crate::schema_equivalence::schema_satisfied_by;
use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry;

use arrow::array::{RecordBatch, builder::StringBuilder};
use arrow::compute::SortOptions;
Expand Down Expand Up @@ -1105,12 +1106,12 @@ impl DefaultPhysicalPlanner {
input_schema.as_arrow(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExecBuilder::new(
let builder = FilterExecBuilder::new(
Arc::clone(&runtime_expr[0]),
physical_input,
)
.with_batch_size(session_state.config().batch_size())
.build()?
.with_batch_size(session_state.config().batch_size());
builder.build()?
}
PlanAsyncExpr::Async(
async_map,
Expand Down Expand Up @@ -1653,15 +1654,16 @@ impl DefaultPhysicalPlanner {
{
// Use SortMergeJoin if hash join is not preferred
let join_on_len = join_on.len();
Arc::new(SortMergeJoinExec::try_new(
let exec = SortMergeJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
*join_type,
vec![SortOptions::default(); join_on_len],
*null_equality,
)?)
)?;
Arc::new(exec)
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join
Expand Down Expand Up @@ -2769,15 +2771,30 @@ impl DefaultPhysicalPlanner {
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

let use_expression_analyzer = session_state
.config_options()
.optimizer
.use_expression_analyzer;
let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
let plan_before_rule = Arc::clone(&new_plan);
new_plan = optimizer
.optimize_with_context(new_plan, session_state)
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;

// Re-inject ExpressionAnalyzer registry into any exec nodes created or replaced by
// this rule. Skip if the rule returned the same plan unchanged to
// avoid an O(nodes) walk for no-op rules.
if use_expression_analyzer && !Arc::ptr_eq(&plan_before_rule, &new_plan) {
new_plan = Self::inject_expression_analyzer(
new_plan,
session_state.expression_analyzer_registry(),
)?;
}

// This only checks the schema in release build, and performs additional checks in debug mode.
OptimizationInvariantChecker::new(optimizer)
.check(&new_plan, &before_schema)?;
Expand Down Expand Up @@ -2847,6 +2864,24 @@ impl DefaultPhysicalPlanner {
Ok(mem_exec)
}

/// Walks `plan` and injects `registry` into every exec node that accepts it,
/// skipping nodes that already have a registry set.
fn inject_expression_analyzer(
plan: Arc<dyn ExecutionPlan>,
registry: &Arc<ExpressionAnalyzerRegistry>,
) -> Result<Arc<dyn ExecutionPlan>> {
use datafusion_common::tree_node::{Transformed, TreeNode};

plan.transform_up(|node| {
if let Some(updated) = node.with_expression_analyzer_registry(registry) {
Ok(Transformed::yes(updated))
} else {
Ok(Transformed::no(node))
}
})
.map(|t| t.data)
}

fn create_project_physical_exec(
&self,
session_state: &SessionState,
Expand Down Expand Up @@ -2918,9 +2953,10 @@ impl DefaultPhysicalPlanner {
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
let new_proj_exec =
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
Ok(Arc::new(new_proj_exec))
Ok(Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::new(async_exec),
)?))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
}
Expand Down
Loading
Loading