Skip to content

Commit 781da86

Browse files
Sevenannnalamb
authored andcommitted
Don't preserve functional dependency when generating UNION logical plan (#44) (apache#12979)
* Don't preserve functional dependency when generating UNION logical plan * Remove extra lines
1 parent f90780d commit 781da86

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2623,6 +2623,54 @@ mod tests {
26232623
Ok(())
26242624
}
26252625

2626+
#[tokio::test]
2627+
async fn test_aggregate_with_union() -> Result<()> {
2628+
let df = test_table().await?;
2629+
2630+
let df1 = df
2631+
.clone()
2632+
// GROUP BY `c1`
2633+
.aggregate(vec![col("c1")], vec![min(col("c2"))])?
2634+
// SELECT `c1` , min(c2) as `result`
2635+
.select(vec![col("c1"), min(col("c2")).alias("result")])?;
2636+
let df2 = df
2637+
.clone()
2638+
// GROUP BY `c1`
2639+
.aggregate(vec![col("c1")], vec![max(col("c3"))])?
2640+
// SELECT `c1` , max(c3) as `result`
2641+
.select(vec![col("c1"), max(col("c3")).alias("result")])?;
2642+
2643+
let df_union = df1.union(df2)?;
2644+
let df = df_union
2645+
// GROUP BY `c1`
2646+
.aggregate(
2647+
vec![col("c1")],
2648+
vec![sum(col("result")).alias("sum_result")],
2649+
)?
2650+
// SELECT `c1`, sum(result) as `sum_result`
2651+
.select(vec![(col("c1")), col("sum_result")])?;
2652+
2653+
let df_results = df.collect().await?;
2654+
2655+
#[rustfmt::skip]
2656+
assert_batches_sorted_eq!(
2657+
[
2658+
"+----+------------+",
2659+
"| c1 | sum_result |",
2660+
"+----+------------+",
2661+
"| a | 84 |",
2662+
"| b | 69 |",
2663+
"| c | 124 |",
2664+
"| d | 126 |",
2665+
"| e | 121 |",
2666+
"+----+------------+"
2667+
],
2668+
&df_results
2669+
);
2670+
2671+
Ok(())
2672+
}
2673+
26262674
#[tokio::test]
26272675
async fn test_aggregate_subexpr() -> Result<()> {
26282676
let df = test_table().await?;

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ use datafusion_common::display::ToStringifiedPlan;
4949
use datafusion_common::file_options::file_type::FileType;
5050
use datafusion_common::{
5151
get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err,
52-
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
53-
TableReference, ToDFSchema, UnnestOptions,
52+
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies,
53+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5454
};
5555
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
5656

@@ -1402,7 +1402,12 @@ pub fn validate_unique_names<'a>(
14021402
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
14031403
// Temporarily use the schema from the left input and later rely on the analyzer to
14041404
// coerce the two schemas into a common one.
1405-
let schema = Arc::clone(left_plan.schema());
1405+
1406+
// Functional Dependencies doesn't preserve after UNION operation
1407+
let schema = (**left_plan.schema()).clone();
1408+
let schema =
1409+
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
1410+
14061411
Ok(LogicalPlan::Union(Union {
14071412
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
14081413
schema,

0 commit comments

Comments
 (0)