Skip to content

Commit 98ad839

Browse files
Sevenannnwiedld
authored andcommitted
Don't preserve functional dependency when generating UNION logical plan (#44) (apache#12979)
* Don't preserve functional dependency when generating UNION logical plan * Remove extra lines
1 parent f78378f commit 98ad839

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2606,6 +2606,54 @@ mod tests {
26062606
Ok(())
26072607
}
26082608

2609+
#[tokio::test]
2610+
async fn test_aggregate_with_union() -> Result<()> {
2611+
let df = test_table().await?;
2612+
2613+
let df1 = df
2614+
.clone()
2615+
// GROUP BY `c1`
2616+
.aggregate(vec![col("c1")], vec![min(col("c2"))])?
2617+
// SELECT `c1` , min(c2) as `result`
2618+
.select(vec![col("c1"), min(col("c2")).alias("result")])?;
2619+
let df2 = df
2620+
.clone()
2621+
// GROUP BY `c1`
2622+
.aggregate(vec![col("c1")], vec![max(col("c3"))])?
2623+
// SELECT `c1` , max(c3) as `result`
2624+
.select(vec![col("c1"), max(col("c3")).alias("result")])?;
2625+
2626+
let df_union = df1.union(df2)?;
2627+
let df = df_union
2628+
// GROUP BY `c1`
2629+
.aggregate(
2630+
vec![col("c1")],
2631+
vec![sum(col("result")).alias("sum_result")],
2632+
)?
2633+
// SELECT `c1`, sum(result) as `sum_result`
2634+
.select(vec![(col("c1")), col("sum_result")])?;
2635+
2636+
let df_results = df.collect().await?;
2637+
2638+
#[rustfmt::skip]
2639+
assert_batches_sorted_eq!(
2640+
[
2641+
"+----+------------+",
2642+
"| c1 | sum_result |",
2643+
"+----+------------+",
2644+
"| a | 84 |",
2645+
"| b | 69 |",
2646+
"| c | 124 |",
2647+
"| d | 126 |",
2648+
"| e | 121 |",
2649+
"+----+------------+"
2650+
],
2651+
&df_results
2652+
);
2653+
2654+
Ok(())
2655+
}
2656+
26092657
#[tokio::test]
26102658
async fn test_aggregate_subexpr() -> Result<()> {
26112659
let df = test_table().await?;

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ use datafusion_common::display::ToStringifiedPlan;
5050
use datafusion_common::file_options::file_type::FileType;
5151
use datafusion_common::{
5252
get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err,
53-
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue,
54-
TableReference, ToDFSchema, UnnestOptions,
53+
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies,
54+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5555
};
5656

5757
use super::dml::InsertOp;
@@ -1401,7 +1401,12 @@ pub fn validate_unique_names<'a>(
14011401
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
14021402
// Temporarily use the schema from the left input and later rely on the analyzer to
14031403
// coerce the two schemas into a common one.
1404-
let schema = Arc::clone(left_plan.schema());
1404+
1405+
// Functional Dependencies doesn't preserve after UNION operation
1406+
let schema = (**left_plan.schema()).clone();
1407+
let schema =
1408+
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
1409+
14051410
Ok(LogicalPlan::Union(Union {
14061411
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
14071412
schema,

0 commit comments

Comments
 (0)