Skip to content

Commit 313b348

Browse files
committed
Merge branch '14550_issue' of github.com:zhuqi-lucas/arrow-datafusion into 14550_issue
2 parents 2c73fcd + 72eaf24 commit 313b348

File tree

7 files changed

+242
-107
lines changed

7 files changed

+242
-107
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,12 +1002,12 @@ pub trait SchemaExt {
10021002
/// It works the same as [`DFSchema::equivalent_names_and_types`].
10031003
fn equivalent_names_and_types(&self, other: &Self) -> bool;
10041004

1005-
/// Returns true if the two schemas have the same qualified named
1006-
/// fields with logically equivalent data types. Returns false otherwise.
1005+
/// Returns nothing if the two schemas have the same qualified named
1006+
/// fields with logically equivalent data types. Returns err string with reason otherwise.
10071007
///
10081008
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
10091009
/// equivalence checking.
1010-
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
1010+
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<(), String>;
10111011
}
10121012

10131013
impl SchemaExt for Schema {
@@ -1028,20 +1028,48 @@ impl SchemaExt for Schema {
10281028
})
10291029
}
10301030

1031-
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
1031+
// There are three cases we need to check
1032+
// 1. The len of the schema of the plan and the schema of the table should be the same
1033+
// 2. The nullable flag of the schema of the plan and the schema of the table should be the same
1034+
// 3. The datatype of the schema of the plan and the schema of the table should be the same
1035+
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<(), String> {
10321036
if self.fields().len() != other.fields().len() {
1033-
return false;
1037+
return Err(format!(
1038+
"Inserting query must have the same schema length as the table. \
1039+
Expected table schema length: {}, got: {}",
1040+
self.fields().len(),
1041+
other.fields().len()
1042+
));
10341043
}
10351044

10361045
self.fields()
10371046
.iter()
10381047
.zip(other.fields().iter())
1039-
.all(|(f1, f2)| {
1040-
f1.name() == f2.name()
1041-
&& DFSchema::datatype_is_logically_equal(
1048+
.try_for_each(|(f1, f2)| {
1049+
if f1.is_nullable() != f2.is_nullable() {
1050+
return Err(format!(
1051+
"Inserting query must have the same schema nullability as the table. \
1052+
Expected table field '{}' nullability: {}, got field: '{}', nullability: {}",
1053+
f1.name(),
1054+
f1.is_nullable(),
1055+
f2.name(),
1056+
f2.is_nullable()
1057+
));
1058+
}
1059+
1060+
if f1.name() != f2.name()
1061+
|| !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type())
1062+
{
1063+
return Err(format!(
1064+
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, but got '{}' with type {:?}.",
1065+
f1.name(),
10421066
f1.data_type(),
1043-
f2.data_type(),
1044-
)
1067+
f2.name(),
1068+
f2.data_type()
1069+
));
1070+
}
1071+
1072+
Ok(())
10451073
})
10461074
}
10471075
}

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -995,27 +995,12 @@ impl TableProvider for ListingTable {
995995
input: Arc<dyn ExecutionPlan>,
996996
insert_op: InsertOp,
997997
) -> Result<Arc<dyn ExecutionPlan>> {
998-
// Check that the schema of the plan matches the schema of this table.
999-
if !self
998+
match self
1000999
.schema()
10011000
.logically_equivalent_names_and_types(&input.schema())
10021001
{
1003-
// Return an error if schema of the input query does not match with the table schema.
1004-
return plan_err!(
1005-
"Inserting query must have the same schema with the table. \
1006-
Expected: {:?}, got: {:?}",
1007-
self.schema()
1008-
.fields()
1009-
.iter()
1010-
.map(|field| field.data_type())
1011-
.collect::<Vec<_>>(),
1012-
input
1013-
.schema()
1014-
.fields()
1015-
.iter()
1016-
.map(|field| field.data_type())
1017-
.collect::<Vec<_>>()
1018-
);
1002+
Ok(_) => {}
1003+
Err(e) => return plan_err!("{}", e),
10191004
}
10201005

10211006
let table_path = &self.table_paths()[0];

datafusion/core/src/datasource/memory.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -276,28 +276,14 @@ impl TableProvider for MemTable {
276276
// If we are inserting into the table, any sort order may be messed up so reset it here
277277
*self.sort_order.lock() = vec![];
278278

279-
// Create a physical plan from the logical plan.
280-
// Check that the schema of the plan matches the schema of this table.
281-
if !self
282-
.schema()
279+
match self
280+
.schema
283281
.logically_equivalent_names_and_types(&input.schema())
284282
{
285-
return plan_err!(
286-
"Inserting query must have the same schema with the table. \
287-
Expected: {:?}, got: {:?}",
288-
self.schema()
289-
.fields()
290-
.iter()
291-
.map(|field| field.data_type())
292-
.collect::<Vec<_>>(),
293-
input
294-
.schema()
295-
.fields()
296-
.iter()
297-
.map(|field| field.data_type())
298-
.collect::<Vec<_>>()
299-
);
283+
Ok(_) => {}
284+
Err(e) => return plan_err!("{}", e),
300285
}
286+
301287
if insert_op != InsertOp::Append {
302288
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
303289
}

datafusion/core/tests/dataframe/mod.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5274,3 +5274,78 @@ async fn register_non_parquet_file() {
52745274
"1.json' does not match the expected extension '.parquet'"
52755275
);
52765276
}
5277+
5278+
// Test inserting into checking.
5279+
#[tokio::test]
5280+
async fn test_insert_into_checking() -> Result<()> {
5281+
// Create a new schema with one field called "a" of type Int64, and setting nullable to false
5282+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
5283+
5284+
let session_ctx = SessionContext::new();
5285+
5286+
// Create and register the initial table with the provided schema and data
5287+
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
5288+
session_ctx.register_table("t", initial_table.clone())?;
5289+
5290+
// There are three cases we need to check
5291+
// 1. The len of the schema of the plan and the schema of the table should be the same
5292+
// 2. The nullable flag of the schema of the plan and the schema of the table should be the same
5293+
// 3. The datatype of the schema of the plan and the schema of the table should be the same
5294+
5295+
// Test case 1:
5296+
let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();
5297+
5298+
match write_df
5299+
.write_table("t", DataFrameWriteOptions::new())
5300+
.await
5301+
{
5302+
Ok(_) => {}
5303+
Err(e) => {
5304+
assert_contains!(
5305+
e.to_string(),
5306+
"Inserting query must have the same schema length as the table."
5307+
);
5308+
}
5309+
}
5310+
5311+
// Test case 2:
5312+
let write_df = session_ctx.sql("values (null), (12)").await.unwrap();
5313+
5314+
match write_df
5315+
.write_table("t", DataFrameWriteOptions::new())
5316+
.await
5317+
{
5318+
Ok(_) => {}
5319+
Err(e) => {
5320+
assert_contains!(
5321+
e.to_string(),
5322+
"Inserting query must have the same schema nullability as the table."
5323+
);
5324+
}
5325+
}
5326+
5327+
// Setting nullable to true
5328+
// Make sure the nullable check go through
5329+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5330+
5331+
let session_ctx = SessionContext::new();
5332+
5333+
// Create and register the initial table with the provided schema and data
5334+
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
5335+
session_ctx.register_table("t", initial_table.clone())?;
5336+
5337+
// Test case 3:
5338+
let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();
5339+
5340+
match write_df
5341+
.write_table("t", DataFrameWriteOptions::new())
5342+
.await
5343+
{
5344+
Ok(_) => {}
5345+
Err(e) => {
5346+
assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected table field 'a' with type Int64, but got 'column1' with type Utf8");
5347+
}
5348+
}
5349+
5350+
Ok(())
5351+
}

datafusion/sqllogictest/test_files/aggregate_skip_partial.slt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ CREATE EXTERNAL TABLE aggregate_test_100 (
2626
c2 TINYINT NOT NULL,
2727
c3 SMALLINT NOT NULL,
2828
c4 SMALLINT,
29-
c5 INT,
29+
c5 INT NOT NULL,
3030
c6 BIGINT NOT NULL,
3131
c7 SMALLINT NOT NULL,
3232
c8 INT NOT NULL,
@@ -506,7 +506,7 @@ SELECT
506506
avg(c11) FILTER (WHERE c2 != 5)
507507
FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
508508
----
509-
a 2.5 0.449071887467
509+
a 2.5 0.449071887467
510510
b 2.642857142857 0.445486298629
511511
c 2.421052631579 0.422882117723
512512
d 2.125 0.518706191331
@@ -711,3 +711,6 @@ true false false false false true false NULL
711711

712712
statement ok
713713
DROP TABLE aggregate_test_100_bool
714+
715+
statement ok
716+
DROP TABLE aggregate_test_100

0 commit comments

Comments
 (0)