Skip to content

fix: incorrect NATURAL/USING JOIN schema #14102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
};
use datafusion_common::utils::get_at_indices;
use datafusion_common::{
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
DataFusionError, HashMap, Result, TableReference,
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap,
Result, TableReference,
};

use indexmap::IndexSet;
Expand Down Expand Up @@ -379,14 +379,12 @@ fn get_exprs_except_skipped(
}
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice
/// (once for each join side), but an unqualified wildcard should include it only once.
/// This function returns the columns that should be excluded.
fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is extracted from expand_wildcard, so that we can reuse it in exprlist_to_fields.

let using_columns = plan.using_columns()?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using_columns() finds join condition columns by traversing the plan tree. This manner might be unsafe as it could incorrectly find columns that are not relevant to the current SQL context. For example, the result of the query below is different from other databases.

create table t(a int);
insert into t values(1),(2),(3);
select * from (select t.a+2 as a from t join t t2 using(a)) as t2;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something we should file a ticket to track?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something we should file a ticket to track?

@alamb Filed #14118

let mut columns_to_skip = using_columns
let excluded = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one of each join column in projection
.flat_map(|cols| {
Expand All @@ -395,18 +393,26 @@ pub fn expand_wildcard(
// qualified column
cols.sort();
let mut out_column_names: HashSet<String> = HashSet::new();
cols.into_iter()
.filter_map(|c| {
if out_column_names.contains(&c.name) {
Some(c)
} else {
out_column_names.insert(c.name);
None
}
})
.collect::<Vec<_>>()
cols.into_iter().filter_map(move |c| {
if out_column_names.contains(&c.name) {
Some(c)
} else {
out_column_names.insert(c.name);
None
}
})
})
.collect::<HashSet<_>>();
Ok(excluded)
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
wildcard_options: Option<&WildcardOptions>,
) -> Result<Vec<Expr>> {
let mut columns_to_skip = exclude_using_columns(plan)?;
let excluded_columns = if let Some(WildcardOptions {
exclude: opt_exclude,
except: opt_except,
Expand Down Expand Up @@ -705,27 +711,20 @@ pub fn exprlist_to_fields<'a>(
.map(|e| match e {
Expr::Wildcard { qualifier, options } => match qualifier {
Copy link
Member Author

@jonahgao jonahgao Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we have moved wildcard expansions to the analyzer #11681, it still does wildcard expansions when computing plan schemas(in exprlist_to_fields and exprlist_len). I wonder if performing wildcard expansions before computing schemas would be simplier, at least it would avoid duplicated work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue is that in exprlist_to_fields, we don't handle replace items, but ExpandWildcardRule does.
select * replace ('foo' as a) from t will give the wrong datatype in schema before executing analyzer.

None => {
let excluded: Vec<String> = get_excluded_columns(
let mut excluded = exclude_using_columns(plan)?;
excluded.extend(get_excluded_columns(
options.exclude.as_ref(),
options.except.as_ref(),
wildcard_schema,
None,
)?
.into_iter()
.map(|c| c.flat_name())
.collect();
Ok::<_, DataFusionError>(
wildcard_schema
.field_names()
.iter()
.enumerate()
.filter(|(_, s)| !excluded.contains(s))
.map(|(i, _)| wildcard_schema.qualified_field(i))
.map(|(qualifier, f)| {
(qualifier.cloned(), Arc::new(f.to_owned()))
})
.collect::<Vec<_>>(),
)
)?);
Ok(wildcard_schema
.iter()
.filter(|(q, f)| {
!excluded.contains(&Column::new(q.cloned(), f.name()))
})
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
.collect::<Vec<_>>())
}
Some(qualifier) => {
let excluded: Vec<String> = get_excluded_columns(
Expand Down
74 changes: 74 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4552,3 +4552,77 @@ fn test_error_message_invalid_window_aggregate_function_signature() {
"Error during planning: sum does not support zero arguments",
);
}

// Test issue: https://github.com/apache/datafusion/issues/14058
// Select with wildcard over a USING/NATURAL JOIN should deduplicate condition columns.
#[test]
fn test_using_join_wildcard_schema() {
let sql = "SELECT * FROM orders o1 JOIN orders o2 USING (order_id)";
let plan = logical_plan(sql).unwrap();
let count = plan
.schema()
.iter()
.filter(|(_, f)| f.name() == "order_id")
.count();
// Only one order_id column
assert_eq!(count, 1);

let sql = "SELECT * FROM orders o1 NATURAL JOIN orders o2";
let plan = logical_plan(sql).unwrap();
// Only columns from one join side should be present
let expected_fields = vec![
"o1.order_id".to_string(),
"o1.customer_id".to_string(),
"o1.o_item_id".to_string(),
"o1.qty".to_string(),
"o1.price".to_string(),
"o1.delivered".to_string(),
];
assert_eq!(plan.schema().field_names(), expected_fields);

// Reproducible example of issue #14058
let sql = "WITH t1 AS (SELECT 1 AS id, 'a' AS value1),
t2 AS (SELECT 1 AS id, 'x' AS value2)
SELECT * FROM t1 NATURAL JOIN t2";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.id".to_string(),
"t1.value1".to_string(),
"t2.value2".to_string()
]
);

// Multiple joins
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
t2 AS (SELECT 1 AS a, 2 AS c),
t3 AS (SELECT 1 AS c, 2 AS d)
SELECT * FROM t1 NATURAL JOIN t2 RIGHT JOIN t3 USING (c)";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.a".to_string(),
"t1.b".to_string(),
"t2.c".to_string(),
"t3.d".to_string()
]
);

// Subquery
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
t2 AS (SELECT 1 AS a, 2 AS c),
t3 AS (SELECT 1 AS c, 2 AS d)
SELECT * FROM (SELECT * FROM t1 LEFT JOIN t2 USING(a)) NATURAL JOIN t3";
let plan = logical_plan(sql).unwrap();
assert_eq!(
plan.schema().field_names(),
[
"t1.a".to_string(),
"t1.b".to_string(),
"t2.c".to_string(),
"t3.d".to_string()
]
);
}
Loading