Skip to content

Commit 42eabb9

Browse files
authored
bug: improve schema checking for insert into cases (#14572)
* Address comments * Address comments * Address comments * Address comment * Address new comments
1 parent d79160b commit 42eabb9

File tree

7 files changed

+112
-64
lines changed

7 files changed

+112
-64
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,12 +1002,14 @@ pub trait SchemaExt {
10021002
/// It works the same as [`DFSchema::equivalent_names_and_types`].
10031003
fn equivalent_names_and_types(&self, other: &Self) -> bool;
10041004

1005-
/// Returns true if the two schemas have the same qualified named
1006-
/// fields with logically equivalent data types. Returns false otherwise.
1005+
/// Returns nothing if the two schemas have the same qualified named
1006+
/// fields with logically equivalent data types. Returns internal error otherwise.
10071007
///
10081008
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
10091009
/// equivalence checking.
1010-
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
1010+
///
1011+
/// It is only used by insert into cases.
1012+
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
10111013
}
10121014

10131015
impl SchemaExt for Schema {
@@ -1028,21 +1030,36 @@ impl SchemaExt for Schema {
10281030
})
10291031
}
10301032

1031-
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
1033+
// It is only used by insert into cases.
1034+
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1035+
// case 1 : schema length mismatch
10321036
if self.fields().len() != other.fields().len() {
1033-
return false;
1037+
_plan_err!(
1038+
"Inserting query must have the same schema length as the table. \
1039+
Expected table schema length: {}, got: {}",
1040+
self.fields().len(),
1041+
other.fields().len()
1042+
)
1043+
} else {
1044+
// case 2 : schema length match, but fields mismatch
1045+
// check if the fields name are the same and have the same data types
1046+
self.fields()
1047+
.iter()
1048+
.zip(other.fields().iter())
1049+
.try_for_each(|(f1, f2)| {
1050+
if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
1051+
_plan_err!(
1052+
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
1053+
but got '{}' with type {:?}.",
1054+
f1.name(),
1055+
f1.data_type(),
1056+
f2.name(),
1057+
f2.data_type())
1058+
} else {
1059+
Ok(())
1060+
}
1061+
})
10341062
}
1035-
1036-
self.fields()
1037-
.iter()
1038-
.zip(other.fields().iter())
1039-
.all(|(f1, f2)| {
1040-
f1.name() == f2.name()
1041-
&& DFSchema::datatype_is_logically_equal(
1042-
f1.data_type(),
1043-
f2.data_type(),
1044-
)
1045-
})
10461063
}
10471064
}
10481065

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -996,27 +996,8 @@ impl TableProvider for ListingTable {
996996
insert_op: InsertOp,
997997
) -> Result<Arc<dyn ExecutionPlan>> {
998998
// Check that the schema of the plan matches the schema of this table.
999-
if !self
1000-
.schema()
1001-
.logically_equivalent_names_and_types(&input.schema())
1002-
{
1003-
// Return an error if schema of the input query does not match with the table schema.
1004-
return plan_err!(
1005-
"Inserting query must have the same schema with the table. \
1006-
Expected: {:?}, got: {:?}",
1007-
self.schema()
1008-
.fields()
1009-
.iter()
1010-
.map(|field| field.data_type())
1011-
.collect::<Vec<_>>(),
1012-
input
1013-
.schema()
1014-
.fields()
1015-
.iter()
1016-
.map(|field| field.data_type())
1017-
.collect::<Vec<_>>()
1018-
);
1019-
}
999+
self.schema()
1000+
.logically_equivalent_names_and_types(&input.schema())?;
10201001

10211002
let table_path = &self.table_paths()[0];
10221003
if !table_path.is_collection() {

datafusion/core/src/datasource/memory.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -278,26 +278,9 @@ impl TableProvider for MemTable {
278278

279279
// Create a physical plan from the logical plan.
280280
// Check that the schema of the plan matches the schema of this table.
281-
if !self
282-
.schema()
283-
.logically_equivalent_names_and_types(&input.schema())
284-
{
285-
return plan_err!(
286-
"Inserting query must have the same schema with the table. \
287-
Expected: {:?}, got: {:?}",
288-
self.schema()
289-
.fields()
290-
.iter()
291-
.map(|field| field.data_type())
292-
.collect::<Vec<_>>(),
293-
input
294-
.schema()
295-
.fields()
296-
.iter()
297-
.map(|field| field.data_type())
298-
.collect::<Vec<_>>()
299-
);
300-
}
281+
self.schema()
282+
.logically_equivalent_names_and_types(&input.schema())?;
283+
301284
if insert_op != InsertOp::Append {
302285
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
303286
}

datafusion/core/tests/dataframe/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5274,3 +5274,55 @@ async fn register_non_parquet_file() {
52745274
"1.json' does not match the expected extension '.parquet'"
52755275
);
52765276
}
5277+
5278+
// Test inserting into checking.
5279+
#[tokio::test]
5280+
async fn test_insert_into_checking() -> Result<()> {
5281+
// Create a new schema with one field called "a" of type Int64, and setting nullable to false
5282+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
5283+
5284+
let session_ctx = SessionContext::new();
5285+
5286+
// Create and register the initial table with the provided schema and data
5287+
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
5288+
session_ctx.register_table("t", initial_table.clone())?;
5289+
5290+
// There are two cases we need to check
5291+
// 1. The len of the schema of the plan and the schema of the table should be the same
5292+
// 2. The datatype of the schema of the plan and the schema of the table should be the same
5293+
5294+
// Test case 1:
5295+
let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();
5296+
5297+
let e = write_df
5298+
.write_table("t", DataFrameWriteOptions::new())
5299+
.await
5300+
.unwrap_err();
5301+
5302+
assert_contains!(
5303+
e.to_string(),
5304+
"Inserting query must have the same schema length as the table."
5305+
);
5306+
5307+
// Setting nullable to true
5308+
// Make sure the nullable check go through
5309+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
5310+
5311+
let session_ctx = SessionContext::new();
5312+
5313+
// Create and register the initial table with the provided schema and data
5314+
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
5315+
session_ctx.register_table("t", initial_table.clone())?;
5316+
5317+
// Test case 2:
5318+
let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();
5319+
5320+
let e = write_df
5321+
.write_table("t", DataFrameWriteOptions::new())
5322+
.await
5323+
.unwrap_err();
5324+
5325+
assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected table field 'a' with type Int64, but got 'column1' with type Utf8");
5326+
5327+
Ok(())
5328+
}

datafusion/sqllogictest/test_files/aggregate_skip_partial.slt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ set datafusion.execution.batch_size = 4;
220220

221221
# Inserting into nullable table with batch_size specified above
222222
# to prevent creation on single in-memory batch
223+
223224
statement ok
224225
CREATE TABLE aggregate_test_100_null (
225226
c2 TINYINT NOT NULL,
@@ -506,7 +507,7 @@ SELECT
506507
avg(c11) FILTER (WHERE c2 != 5)
507508
FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
508509
----
509-
a 2.5 0.449071887467
510+
a 2.5 0.449071887467
510511
b 2.642857142857 0.445486298629
511512
c 2.421052631579 0.422882117723
512513
d 2.125 0.518706191331

datafusion/sqllogictest/test_files/insert.slt

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,11 @@ insert into table_without_values(field1) values(3);
296296
1
297297

298298
# insert NULL values for the missing column (field1), but column is non-nullable
299-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
299+
statement error
300300
insert into table_without_values(field2) values(300);
301+
----
302+
DataFusion error: Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
303+
301304

302305
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
303306
insert into table_without_values values(NULL, 300);
@@ -358,7 +361,7 @@ statement ok
358361
create table test_column_defaults(
359362
a int,
360363
b int not null default null,
361-
c int default 100*2+300,
364+
c int default 100*2+300,
362365
d text default lower('DEFAULT_TEXT'),
363366
e timestamp default now()
364367
)
@@ -368,8 +371,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
368371
----
369372
1
370373

371-
statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
374+
statement error
372375
insert into test_column_defaults(a) values(2)
376+
----
377+
DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
378+
373379

374380
query I
375381
insert into test_column_defaults(b) values(20)
@@ -412,7 +418,7 @@ statement ok
412418
create table test_column_defaults(
413419
a int,
414420
b int not null default null,
415-
c int default 100*2+300,
421+
c int default 100*2+300,
416422
d text default lower('DEFAULT_TEXT'),
417423
e timestamp default now()
418424
) as values(1, 10, 100, 'ABC', now())

datafusion/sqllogictest/test_files/insert_to_external.slt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ STORED AS parquet
6060
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
6161
PARTITIONED BY (b);
6262

63+
#query error here because PARTITIONED BY (b) will make the b nullable to false
6364
query I
6465
insert into dictionary_encoded_parquet_partitioned
6566
select * from dictionary_encoded_values
@@ -81,6 +82,7 @@ STORED AS arrow
8182
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
8283
PARTITIONED BY (b);
8384

85+
#query error here because PARTITIONED BY (b) will make the b nullable to false
8486
query I
8587
insert into dictionary_encoded_arrow_partitioned
8688
select * from dictionary_encoded_values
@@ -543,8 +545,11 @@ insert into table_without_values(field1) values(3);
543545
1
544546

545547
# insert NULL values for the missing column (field1), but column is non-nullable
546-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
548+
statement error
547549
insert into table_without_values(field2) values(300);
550+
----
551+
DataFusion error: Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
552+
548553

549554
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
550555
insert into table_without_values values(NULL, 300);
@@ -581,8 +586,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
581586
----
582587
1
583588

584-
statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
589+
statement error
585590
insert into test_column_defaults(a) values(2)
591+
----
592+
DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
593+
586594

587595
query I
588596
insert into test_column_defaults(b) values(20)

0 commit comments

Comments
 (0)