Skip to content

Commit 9e1e32f

Browse files
zhuqi-lucasalamb
andauthored
Support logic optimize rule to pass the case that Utf8view datatype combined with Utf8 datatype (#15239)
* Support logic optimize rule to pass the case that Utf8view datatype combined with Utf8 datatype * Support logic optimize rule to pass the case that Utf8view datatype combined with Utf8 datatype * fix test * fix * fix * fmt * clean code * fix test * Fix test * address comments * move docs --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 012c1f0 commit 9e1e32f

File tree

5 files changed

+63
-42
lines changed

5 files changed

+63
-42
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ impl DFSchema {
564564
}
565565

566566
/// Check to see if fields in 2 Arrow schemas are compatible
567+
#[deprecated(since = "47.0.0", note = "This method is no longer used")]
567568
pub fn check_arrow_schema_type_compatible(
568569
&self,
569570
arrow_schema: &Schema,
@@ -604,26 +605,57 @@ impl DFSchema {
604605
})
605606
}
606607

607-
/// Returns true if the two schemas have the same qualified named
608-
/// fields with the same data types. Returns false otherwise.
608+
#[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
609+
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
610+
self.has_equivalent_names_and_types(other).is_ok()
611+
}
612+
613+
/// Returns Ok if the two schemas have the same qualified named
614+
/// fields with the compatible data types.
609615
///
610-
/// This is a specialized version of Eq that ignores differences
611-
/// in nullability and metadata.
616+
/// Returns an `Err` with a message otherwise.
617+
///
618+
/// This is a specialized version of Eq that ignores differences in
619+
/// nullability and metadata.
612620
///
613621
/// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
614622
/// logical type checking, which for example would consider a dictionary
615623
/// encoded UTF8 array to be equivalent to a plain UTF8 array.
616-
pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
624+
pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
625+
// case 1 : schema length mismatch
617626
if self.fields().len() != other.fields().len() {
618-
return false;
627+
_plan_err!(
628+
"Schema mismatch: the schema length are not same \
629+
Expected schema length: {}, got: {}",
630+
self.fields().len(),
631+
other.fields().len()
632+
)
633+
} else {
634+
// case 2 : schema length match, but fields mismatch
635+
// check if the fields name are the same and have the same data types
636+
self.fields()
637+
.iter()
638+
.zip(other.fields().iter())
639+
.try_for_each(|(f1, f2)| {
640+
if f1.name() != f2.name()
641+
|| (!DFSchema::datatype_is_semantically_equal(
642+
f1.data_type(),
643+
f2.data_type(),
644+
) && !can_cast_types(f2.data_type(), f1.data_type()))
645+
{
646+
_plan_err!(
647+
"Schema mismatch: Expected field '{}' with type {:?}, \
648+
but got '{}' with type {:?}.",
649+
f1.name(),
650+
f1.data_type(),
651+
f2.name(),
652+
f2.data_type()
653+
)
654+
} else {
655+
Ok(())
656+
}
657+
})
619658
}
620-
let self_fields = self.iter();
621-
let other_fields = other.iter();
622-
self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
623-
q1 == q2
624-
&& f1.name() == f2.name()
625-
&& Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
626-
})
627659
}
628660

629661
/// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint

datafusion/expr/src/logical_plan/invariants.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,11 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
112112
/// Returns an error if the plan does not have the expected schema.
113113
/// Ignores metadata and nullability.
114114
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
115-
let equivalent = plan.schema().equivalent_names_and_types(schema);
115+
let compatible = plan.schema().has_equivalent_names_and_types(schema);
116116

117-
if !equivalent {
117+
if let Err(e) = compatible {
118118
internal_err!(
119-
"Failed due to a difference in schemas, original schema: {:?}, new schema: {:?}",
119+
"Failed due to a difference in schemas: {e}, original schema: {:?}, new schema: {:?}",
120120
schema,
121121
plan.schema()
122122
)

datafusion/optimizer/src/optimizer.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ fn assert_valid_optimization(
447447
plan: &LogicalPlan,
448448
prev_schema: &Arc<DFSchema>,
449449
) -> Result<()> {
450-
// verify invariant: optimizer passes should not change the schema
450+
// verify invariant: optimizer passes should not change the schema if the schema can't be cast from the previous schema.
451451
// Refer to <https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
452452
assert_expected_schema(prev_schema, plan)?;
453453

@@ -459,7 +459,9 @@ mod tests {
459459
use std::sync::{Arc, Mutex};
460460

461461
use datafusion_common::tree_node::Transformed;
462-
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
462+
use datafusion_common::{
463+
assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError, Result,
464+
};
463465
use datafusion_expr::logical_plan::EmptyRelation;
464466
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
465467

@@ -505,28 +507,9 @@ mod tests {
505507
schema: Arc::new(DFSchema::empty()),
506508
});
507509
let err = opt.optimize(plan, &config, &observe).unwrap_err();
508-
assert!(err.strip_backtrace().starts_with(
509-
"Optimizer rule 'get table_scan rule' failed\n\
510-
caused by\n\
511-
Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\
512-
caused by\n\
513-
Internal error: Failed due to a difference in schemas, \
514-
original schema: DFSchema { inner: Schema { \
515-
fields: [], \
516-
metadata: {} }, \
517-
field_qualifiers: [], \
518-
functional_dependencies: FunctionalDependencies { deps: [] } \
519-
}, \
520-
new schema: DFSchema { inner: Schema { \
521-
fields: [\
522-
Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
523-
Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
524-
Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\
525-
], \
526-
metadata: {} }, \
527-
field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \
528-
functional_dependencies: FunctionalDependencies { deps: [] } }",
529-
));
510+
511+
// Simplify assert to check the error message contains the expected message, which is only the schema length mismatch
512+
assert_contains!(err.strip_backtrace(), "Schema mismatch: the schema length are not same Expected schema length: 3, got: 0");
530513
}
531514

532515
#[test]

datafusion/sqllogictest/test_files/expr.slt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,14 @@ a
478478
statement ok
479479
create table foo (a varchar, b varchar) as values ('a', 'b');
480480

481+
482+
query T
483+
SELECT concat_ws('', a, b,'c') from foo
484+
----
485+
abc
486+
481487
query T
482-
SELECT concat_ws('',a,b,'c') from foo
488+
SELECT concat_ws('',arrow_cast(a, 'Utf8View'),arrow_cast(b, 'Utf8View'),'c') from foo
483489
----
484490
abc
485491

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ pub async fn from_substrait_plan_with_consumer(
776776
return Ok(plan);
777777
}
778778
let renamed_schema = make_renamed_schema(plan.schema(), &root.names)?;
779-
if renamed_schema.equivalent_names_and_types(plan.schema()) {
779+
if renamed_schema.has_equivalent_names_and_types(plan.schema()).is_ok() {
780780
// Nothing to do if the schema is already equivalent
781781
return Ok(plan);
782782
}

0 commit comments

Comments
 (0)