Skip to content

Commit 81d06f2

Browse files
authored
Move OutputRequirements to datafusion-physical-optimizer crate (#11579)
* Move OutputRequirements to datafusion-physical-optimizer crate * Fix fmt * Fix cargo for cli
1 parent 51da92f commit 81d06f2

File tree

6 files changed

+25
-17
lines changed

6 files changed

+25
-17
lines changed

datafusion-cli/Cargo.lock

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
use std::fmt::Debug;
2525
use std::sync::Arc;
2626

27-
use super::output_requirements::OutputRequirementExec;
2827
use crate::config::ConfigOptions;
2928
use crate::error::Result;
3029
use crate::physical_optimizer::utils::{
@@ -55,6 +54,7 @@ use datafusion_physical_expr::{
5554
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
5655
use datafusion_physical_plan::ExecutionPlanProperties;
5756

57+
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
5858
use datafusion_physical_optimizer::PhysicalOptimizerRule;
5959
use itertools::izip;
6060

@@ -1290,7 +1290,6 @@ pub(crate) mod tests {
12901290
use crate::datasource::object_store::ObjectStoreUrl;
12911291
use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec};
12921292
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
1293-
use crate::physical_optimizer::output_requirements::OutputRequirements;
12941293
use crate::physical_optimizer::test_utils::{
12951294
check_integrity, coalesce_partitions_exec, repartition_exec,
12961295
};
@@ -1301,6 +1300,7 @@ pub(crate) mod tests {
13011300
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
13021301
use crate::physical_plan::sorts::sort::SortExec;
13031302
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
1303+
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
13041304

13051305
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
13061306
use datafusion_common::ScalarValue;

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ pub mod enforce_sorting;
2929
pub mod join_selection;
3030
pub mod limited_distinct_aggregation;
3131
pub mod optimizer;
32-
pub mod output_requirements;
3332
pub mod projection_pushdown;
3433
pub mod pruning;
3534
pub mod replace_with_order_preserving_variants;

datafusion/physical-optimizer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ workspace = true
3333

3434
[dependencies]
3535
datafusion-common = { workspace = true, default-features = true }
36+
datafusion-execution = { workspace = true }
37+
datafusion-physical-expr = { workspace = true }
3638
datafusion-physical-plan = { workspace = true }

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
#![deny(clippy::clone_on_ref_ptr)]
1919

2020
mod optimizer;
21+
pub mod output_requirements;
2122

2223
pub use optimizer::PhysicalOptimizerRule;

datafusion/core/src/physical_optimizer/output_requirements.rs renamed to datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,21 @@
2424
2525
use std::sync::Arc;
2626

27-
use crate::physical_plan::sorts::sort::SortExec;
28-
use crate::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
27+
use datafusion_execution::TaskContext;
28+
use datafusion_physical_plan::sorts::sort::SortExec;
29+
use datafusion_physical_plan::{
30+
DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
31+
};
2932

3033
use datafusion_common::config::ConfigOptions;
3134
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3235
use datafusion_common::{Result, Statistics};
3336
use datafusion_physical_expr::{Distribution, LexRequirement, PhysicalSortRequirement};
34-
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3537
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3638
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
3739

40+
use crate::PhysicalOptimizerRule;
41+
3842
/// This rule either adds or removes [`OutputRequirements`]s to/from the physical
3943
/// plan according to its `mode` attribute, which is set by the constructors
4044
/// `new_add_mode` and `new_remove_mode`. With this rule, we can keep track of
@@ -86,15 +90,15 @@ enum RuleMode {
8690
///
8791
/// See [`OutputRequirements`] for more details
8892
#[derive(Debug)]
89-
pub(crate) struct OutputRequirementExec {
93+
pub struct OutputRequirementExec {
9094
input: Arc<dyn ExecutionPlan>,
9195
order_requirement: Option<LexRequirement>,
9296
dist_requirement: Distribution,
9397
cache: PlanProperties,
9498
}
9599

96100
impl OutputRequirementExec {
97-
pub(crate) fn new(
101+
pub fn new(
98102
input: Arc<dyn ExecutionPlan>,
99103
requirements: Option<LexRequirement>,
100104
dist_requirement: Distribution,
@@ -108,8 +112,8 @@ impl OutputRequirementExec {
108112
}
109113
}
110114

111-
pub(crate) fn input(&self) -> Arc<dyn ExecutionPlan> {
112-
self.input.clone()
115+
pub fn input(&self) -> Arc<dyn ExecutionPlan> {
116+
Arc::clone(&self.input)
113117
}
114118

115119
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -179,8 +183,8 @@ impl ExecutionPlan for OutputRequirementExec {
179183
fn execute(
180184
&self,
181185
_partition: usize,
182-
_context: Arc<crate::execution::context::TaskContext>,
183-
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
186+
_context: Arc<TaskContext>,
187+
) -> Result<SendableRecordBatchStream> {
184188
unreachable!();
185189
}
186190

@@ -275,7 +279,7 @@ fn require_top_ordering_helper(
275279
// When an operator requires an ordering, any `SortExec` below can not
276280
// be responsible for (i.e. the originator of) the global ordering.
277281
let (new_child, is_changed) =
278-
require_top_ordering_helper(children.swap_remove(0).clone())?;
282+
require_top_ordering_helper(Arc::clone(children.swap_remove(0)))?;
279283
Ok((plan.with_new_children(vec![new_child])?, is_changed))
280284
} else {
281285
// Stop searching, there is no global ordering desired for the query.

0 commit comments

Comments
 (0)