Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,21 @@ jobs:
runs-on: ubuntu-latest
container:
image: amd64/rust
volumes:
- /usr/local:/host/usr/local
steps:
- name: Remove unnecessary preinstalled software
run: |
echo "Disk space before cleanup:"
df -h
# remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t)
rm -rf /__t/* || true
# remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup)
rm -rf /host/usr/local/.ghcup || true
# remove Android library: about 7.8GB (host /usr/local/lib/android)
rm -rf /host/usr/local/lib/android || true
echo "Disk space after cleanup:"
df -h
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
submodules: true
Expand Down
332 changes: 321 additions & 11 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

99 changes: 95 additions & 4 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
use std::sync::Arc;

use crate::memory_limit::DummyStreamPartition;
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, bounded_window_exec_with_partition,
check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema,
create_test_schema2, create_test_schema3, filter_exec, global_limit_exec,
hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort,
projection_exec, repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr,
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
union_exec, RequirementsTestExec,
parquet_exec_with_stats, projection_exec, repartition_exec, schema, sort_exec,
sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
};

use arrow::compute::SortOptions;
Expand All @@ -48,6 +49,9 @@ use datafusion_physical_expr_common::sort_expr::{
};
use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, NotExpr};
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
Expand Down Expand Up @@ -2302,6 +2306,93 @@ async fn test_commutativity() -> Result<()> {
Ok(())
}

fn single_partition_aggregate(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let group_by = alias_pairs
.iter()
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(group_by);

Arc::new(
AggregateExec::try_new(
AggregateMode::SinglePartitioned,
group_by,
vec![],
vec![],
input,
schema,
)
.unwrap(),
)
}

#[tokio::test]
async fn test_preserve_needed_coalesce() -> Result<()> {
// Input to EnforceSorting, from our test case.
let plan = projection_exec_with_alias(
union_exec(vec![parquet_exec_with_stats(10000); 2]),
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "value".to_string()),
],
);
let plan = Arc::new(CoalescePartitionsExec::new(plan));
let schema = schema();
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}])
.unwrap();
let plan: Arc<dyn ExecutionPlan> =
single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
let plan = sort_exec(sort_key, plan);

// Starting plan: as in our test case.
assert_eq!(
get_plan_string(&plan),
vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);

let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default());
assert!(checker.is_ok());

// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(plan, &Default::default())?;
assert_eq!(
get_plan_string(&optimized),
vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
);

// Plan is valid.
let checker = SanityCheckPlan::new();
let checker = checker.optimize(optimized, &Default::default());
assert!(checker.is_ok());

Ok(())
}

#[tokio::test]
async fn test_coalesce_propagate() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ pub fn check_integrity<T: Clone>(context: PlanContext<T>) -> Result<PlanContext<
.data()
}

pub fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}

// construct a stream partition for test purposes
#[derive(Debug)]
pub struct TestStreamPartition {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync {

// exclude the first function argument(= column) in ordered set aggregate function,
// because it is duplicated with the WITHIN GROUP clause in schema name.
let args = if self.is_ordered_set_aggregate() {
let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() {
&args[1..]
} else {
&args[..]
Expand Down
40 changes: 36 additions & 4 deletions datafusion/physical-expr/src/equivalence/properties/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,43 @@ fn calculate_union_binary(
})
.collect::<Vec<_>>();

// TEMP HACK WORKAROUND
// Revert code from https://github.com/apache/datafusion/pull/12562
// Context: https://github.com/apache/datafusion/issues/13748
// Context: https://github.com/influxdata/influxdb_iox/issues/13038

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
let mut orderings = UnionEquivalentOrderingBuilder::new();
orderings.add_satisfied_orderings(&lhs, &rhs)?;
orderings.add_satisfied_orderings(&rhs, &lhs)?;
let orderings = orderings.build();
let mut orderings = vec![];
for ordering in lhs.normalized_oeq_class().into_iter() {
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();

// Progressively shorten the ordering to search for a satisfied prefix:
while !rhs.ordering_satisfy(ordering.clone())? {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}

for ordering in rhs.normalized_oeq_class().into_iter() {
let mut ordering: Vec<PhysicalSortExpr> = ordering.into();

// Progressively shorten the ordering to search for a satisfied prefix:
while !lhs.ordering_satisfy(ordering.clone())? {
ordering.pop();
}
// There is a non-trivial satisfied prefix, add it as a valid ordering:
if !ordering.is_empty() {
orderings.push(ordering);
}
}
let mut eq_properties = EquivalenceProperties::new(lhs.schema);
eq_properties.add_constants(constants)?;
eq_properties.add_orderings(orderings);

Ok(eq_properties)
}

Expand Down Expand Up @@ -122,6 +149,7 @@ struct UnionEquivalentOrderingBuilder {
orderings: Vec<LexOrdering>,
}

#[expect(unused)]
impl UnionEquivalentOrderingBuilder {
fn new() -> Self {
Self { orderings: vec![] }
Expand Down Expand Up @@ -504,6 +532,7 @@ mod tests {
}

#[test]
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
Expand Down Expand Up @@ -579,6 +608,7 @@ mod tests {
}

#[test]
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()>
{
let schema = create_test_schema().unwrap();
Expand Down Expand Up @@ -607,6 +637,7 @@ mod tests {
}

#[test]
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
Expand Down Expand Up @@ -658,6 +689,7 @@ mod tests {
}

#[test]
#[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"]
fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> {
let schema = create_test_schema().unwrap();
UnionEquivalenceTest::new(&schema)
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ use crate::enforce_sorting::sort_pushdown::{
};
use crate::output_requirements::OutputRequirementExec;
use crate::utils::{
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions,
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
};
use crate::PhysicalOptimizerRule;

Expand Down Expand Up @@ -678,7 +678,7 @@ fn remove_bottleneck_in_subplan(
) -> Result<PlanWithCorrespondingCoalescePartitions> {
let plan = &requirements.plan;
let children = &mut requirements.children;
if is_coalesce_partitions(&children[0].plan) {
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
let mut new_child_node = children[0].children.swap_remove(0);
while new_child_node.plan.output_partitioning() == plan.output_partitioning()
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-optimizer/src/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use crate::PhysicalOptimizerRule;
Expand Down Expand Up @@ -135,6 +137,14 @@ pub fn check_plan_sanity(
plan.required_input_ordering(),
plan.required_input_distribution(),
) {
// TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492
if child.as_any().downcast_ref::<UnionExec>().is_some() {
continue;
}
if child.as_any().downcast_ref::<SortExec>().is_some() {
continue;
}

let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
let sort_req = sort_req.into_single();
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use datafusion_common::Result;
use datafusion_physical_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::aggregates::AggregateExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -113,3 +114,8 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
}

/// Checks whether the given operator is a [`AggregateExec`].
pub fn is_aggregation(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<AggregateExec>()
}
23 changes: 23 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,29 @@ c 122
d 124
e 115


# using approx_percentile_cont on 2 columns with same signature
query TII
SELECT c1, approx_percentile_cont(c2, 0.95) AS c2, approx_percentile_cont(c3, 0.95) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
----
a 5 73
b 5 68
c 5 122
d 5 124
e 5 115

# error is unique to this UDAF
query TRR
SELECT c1, avg(c2) AS c2, avg(c3) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
----
a 2.857142857143 -18.333333333333
b 3.263157894737 -5.842105263158
c 2.666666666667 -1.333333333333
d 2.444444444444 25.444444444444
e 3 40.333333333333



query TI
SELECT c1, approx_percentile_cont(0.95) WITHIN GROUP (ORDER BY c3 DESC) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
----
Expand Down
Loading