Skip to content

Commit 2938fb1

Browse files
logan-keedeburaksennberkaysynnada
authored
Move EnforceDistribution into datafusion-physical-optimizer crate (#14190)
* moving enforce_distribution * formatting fix * Update Cargo.toml Fix circular dependency * pulll source * move_tests to core integration tests * remove tests from enforce_distribution.rs * passes lint * forgotten license header * move enforce_sorting * import order * merge fixes + formatting * fix: forgotten license * fix: cargo fmt * fix tests * fix: ci tests * fix: Cargo.toml formatting * further removals * Further migrations and simplificaitons * Fix failing tests * Final pass * Update datafusion-testing * Update test_utils.rs * fix the dep * Update Cargo.toml --------- Co-authored-by: buraksenn <[email protected]> Co-authored-by: berkaysynnada <[email protected]>
1 parent 0ba6e70 commit 2938fb1

26 files changed

+6723
-6736
lines changed

datafusion-cli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 0 additions & 4662 deletions
This file was deleted.

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//!
2323
//! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan
2424
25-
pub mod enforce_distribution;
2625
pub mod optimizer;
2726
pub mod projection_pushdown;
2827

datafusion/core/src/test_util/mod.rs

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4444
use arrow::record_batch::RecordBatch;
4545
use datafusion_catalog::Session;
4646
use datafusion_common::TableReference;
47-
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
4847
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
49-
use datafusion_functions_aggregate::count::count_udaf;
50-
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
51-
use datafusion_physical_expr::{expressions, PhysicalExpr};
5248

5349
use async_trait::async_trait;
5450
use futures::Stream;
@@ -276,57 +272,3 @@ pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchSt
276272
batch,
277273
})
278274
}
279-
280-
/// Describe the type of aggregate being tested
281-
pub enum TestAggregate {
282-
/// Testing COUNT(*) type aggregates
283-
CountStar,
284-
285-
/// Testing for COUNT(column) aggregate
286-
ColumnA(Arc<Schema>),
287-
}
288-
289-
impl TestAggregate {
290-
/// Create a new COUNT(*) aggregate
291-
pub fn new_count_star() -> Self {
292-
Self::CountStar
293-
}
294-
295-
/// Create a new COUNT(column) aggregate
296-
pub fn new_count_column(schema: &Arc<Schema>) -> Self {
297-
Self::ColumnA(Arc::clone(schema))
298-
}
299-
300-
/// Return appropriate expr depending if COUNT is for col or table (*)
301-
pub fn count_expr(&self, schema: &Schema) -> AggregateFunctionExpr {
302-
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
303-
.schema(Arc::new(schema.clone()))
304-
.alias(self.column_name())
305-
.build()
306-
.unwrap()
307-
}
308-
309-
/// what argument would this aggregate need in the plan?
310-
fn column(&self) -> Arc<dyn PhysicalExpr> {
311-
match self {
312-
Self::CountStar => expressions::lit(COUNT_STAR_EXPANSION),
313-
Self::ColumnA(s) => expressions::col("a", s).unwrap(),
314-
}
315-
}
316-
317-
/// What name would this aggregate produce in a plan?
318-
pub fn column_name(&self) -> &'static str {
319-
match self {
320-
Self::CountStar => "COUNT(*)",
321-
Self::ColumnA(_) => "COUNT(a)",
322-
}
323-
}
324-
325-
/// What is the expected count?
326-
pub fn expected_count(&self) -> i64 {
327-
match self {
328-
TestAggregate::CountStar => 3,
329-
TestAggregate::ColumnA(_) => 2,
330-
}
331-
}
332-
}

datafusion/core/tests/core_integration.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ mod custom_sources_cases;
4242
/// Run all tests that are found in the `optimizer` directory
4343
mod optimizer;
4444

45-
/// Run all tests that are found in the `physical_optimizer` directory
4645
mod physical_optimizer;
4746

4847
#[cfg(test)]

datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
//!
2020
//! Note these tests are not in the same module as the optimizer pass because
2121
//! they rely on `ParquetExec` which is in the core crate.
22+
2223
use std::sync::Arc;
2324

25+
use crate::physical_optimizer::parquet_exec;
26+
2427
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
25-
use datafusion::datasource::listing::PartitionedFile;
26-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
2728
use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
29+
use datafusion::physical_optimizer::test_utils::trim_plan_display;
2830
use datafusion_common::config::ConfigOptions;
29-
use datafusion_execution::object_store::ObjectStoreUrl;
3031
use datafusion_functions_aggregate::count::count_udaf;
3132
use datafusion_functions_aggregate::sum::sum_udaf;
3233
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
@@ -62,13 +63,6 @@ macro_rules! assert_optimized {
6263
};
6364
}
6465

65-
fn trim_plan_display(plan: &str) -> Vec<&str> {
66-
plan.split('\n')
67-
.map(|s| s.trim())
68-
.filter(|s| !s.is_empty())
69-
.collect()
70-
}
71-
7266
fn schema() -> SchemaRef {
7367
Arc::new(Schema::new(vec![
7468
Field::new("a", DataType::Int64, true),
@@ -77,14 +71,6 @@ fn schema() -> SchemaRef {
7771
]))
7872
}
7973

80-
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
81-
ParquetExec::builder(
82-
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone())
83-
.with_file(PartitionedFile::new("x".to_string(), 100)),
84-
)
85-
.build_arc()
86-
}
87-
8874
fn partial_aggregate_exec(
8975
input: Arc<dyn ExecutionPlan>,
9076
group_by: PhysicalGroupBy,

0 commit comments

Comments
 (0)