Skip to content

Move EnforceDistribution into datafusion-physical-optimizer crate #14190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1f12ffe
moving enforce_distribution
logan-keede Jan 18, 2025
b58f9d6
formatting fix
logan-keede Jan 18, 2025
0d571a7
Update Cargo.toml
logan-keede Jan 18, 2025
24ab913
pulll source
logan-keede Jan 19, 2025
a7f8927
move_tests to core integration tests
logan-keede Jan 19, 2025
0296d04
remove tests from enforce_distribution.rs
logan-keede Jan 20, 2025
1d94407
Merge branch 'move_crates' of https://github.com/logan-keede/datafusi…
logan-keede Jan 20, 2025
7cd4b31
passes lint
buraksenn Jan 20, 2025
eb8ac39
forgotten license header
buraksenn Jan 20, 2025
8103f00
move enforce_sorting
logan-keede Jan 20, 2025
460c92d
merge main
buraksenn Jan 21, 2025
fef3c6e
import order
buraksenn Jan 21, 2025
7c1dad6
Merge branch 'move-enforce-sorting-to-new-crate' of https://github.co…
logan-keede Jan 21, 2025
dcbc0e1
merge fixes + formatting
logan-keede Jan 21, 2025
1f116e4
fix: forgotten license
logan-keede Jan 21, 2025
5093e61
fix: cargo fmt
logan-keede Jan 21, 2025
9c40e49
fix tests
buraksenn Jan 21, 2025
3d78ef2
Merge branch 'move-enforce-sorting-to-new-crate' of https://github.co…
logan-keede Jan 21, 2025
9d19863
fix: ci tests
logan-keede Jan 21, 2025
eba718a
fix: Cargo.toml formatting
logan-keede Jan 21, 2025
8926f90
Merge branch 'apache_main' into move_crates
berkaysynnada Jan 22, 2025
afb20f8
further removals
berkaysynnada Jan 22, 2025
f04332d
Further migrations and simplificaitons
berkaysynnada Jan 22, 2025
20d47d3
Fix failing tests
berkaysynnada Jan 22, 2025
b13ba8e
Final pass
berkaysynnada Jan 22, 2025
9e1dfbb
Update datafusion-testing
berkaysynnada Jan 22, 2025
b7373fb
Update test_utils.rs
berkaysynnada Jan 22, 2025
a8fe5ca
fix the dep
berkaysynnada Jan 22, 2025
99a419f
Update Cargo.toml
berkaysynnada Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4,662 changes: 0 additions & 4,662 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

This file was deleted.

1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
//!
//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan

pub mod enforce_distribution;
pub mod optimizer;
pub mod projection_pushdown;

Expand Down
58 changes: 0 additions & 58 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_catalog::Session;
use datafusion_common::TableReference;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::{expressions, PhysicalExpr};

use async_trait::async_trait;
use futures::Stream;
Expand Down Expand Up @@ -276,57 +272,3 @@ pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchSt
batch,
})
}

/// Describe the type of aggregate being tested
pub enum TestAggregate {
/// Testing COUNT(*) type aggregates
CountStar,

/// Testing for COUNT(column) aggregate
ColumnA(Arc<Schema>),
}

impl TestAggregate {
/// Create a new COUNT(*) aggregate
pub fn new_count_star() -> Self {
Self::CountStar
}

/// Create a new COUNT(column) aggregate
pub fn new_count_column(schema: &Arc<Schema>) -> Self {
Self::ColumnA(Arc::clone(schema))
}

/// Return appropriate expr depending if COUNT is for col or table (*)
pub fn count_expr(&self, schema: &Schema) -> AggregateFunctionExpr {
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
.schema(Arc::new(schema.clone()))
.alias(self.column_name())
.build()
.unwrap()
}

/// what argument would this aggregate need in the plan?
fn column(&self) -> Arc<dyn PhysicalExpr> {
match self {
Self::CountStar => expressions::lit(COUNT_STAR_EXPANSION),
Self::ColumnA(s) => expressions::col("a", s).unwrap(),
}
}

/// What name would this aggregate produce in a plan?
pub fn column_name(&self) -> &'static str {
match self {
Self::CountStar => "COUNT(*)",
Self::ColumnA(_) => "COUNT(a)",
}
}

/// What is the expected count?
pub fn expected_count(&self) -> i64 {
match self {
TestAggregate::CountStar => 3,
TestAggregate::ColumnA(_) => 2,
}
}
}
1 change: 0 additions & 1 deletion datafusion/core/tests/core_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ mod custom_sources_cases;
/// Run all tests that are found in the `optimizer` directory
mod optimizer;

/// Run all tests that are found in the `physical_optimizer` directory
mod physical_optimizer;

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
//!
//! Note these tests are not in the same module as the optimizer pass because
//! they rely on `ParquetExec` which is in the core crate.

use std::sync::Arc;

use crate::physical_optimizer::parquet_exec;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
use datafusion::physical_optimizer::test_utils::trim_plan_display;
use datafusion_common::config::ConfigOptions;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
Expand Down Expand Up @@ -62,13 +63,6 @@ macro_rules! assert_optimized {
};
}

fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Expand All @@ -77,14 +71,6 @@ fn schema() -> SchemaRef {
]))
}

fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100)),
)
.build_arc()
}

fn partial_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
Expand Down
Loading
Loading