Skip to content

bug: improve schema checking for insert into cases #14572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,12 +1002,12 @@ pub trait SchemaExt {
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
/// Returns nothing if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns internal error otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
}

impl SchemaExt for Schema {
Expand All @@ -1028,21 +1028,41 @@ impl SchemaExt for Schema {
})
}

fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
if self.fields().len() != other.fields().len() {
return false;
_plan_err!(
"Inserting query must have the same schema length as the table. \
Expected table schema length: {}, got: {}",
self.fields().len(),
other.fields().len()
)
} else {
self.fields()
.iter()
.zip(other.fields().iter())
.try_for_each(|(f1, f2)| {
// only check the case when the table field is not nullable and the insert data field is nullable
if !f1.is_nullable() && f2.is_nullable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition would prevent the following query from executing, but it works on both the main branch and Postgres.

create table t1(a int not null);
create table t2(a int);
insert into t2 values(100);
insert into t1 select * from t2;

As I mentioned earlier, we already have a check during execution called check_not_null_constraints, so I think we should not add this restriction here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonahgao , got it now, this is a good example to explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonahgao I addressed the comments, but is the issue valid now? #14550

I can't find which bug we need to fix, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's necessary to ensure that the schema of output batches has the same nullability. This issue exists not only with inserts but also with other queries like UNION.

DataFusion CLI v45.0.0
> create table t1(a int not null) as values(1);
0 row(s) fetched.
Elapsed 0.009 seconds.

> create table t2(a int) as values(2);
0 row(s) fetched.
Elapsed 0.011 seconds.

> select * from t1 union all select * from t2;
batch schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
batch schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
+---+
| a |
+---+
| 1 |
| 2 |
+---+
2 row(s) fetched.
Elapsed 0.007 seconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is necessary, perhaps we should rewrite the nullability of the output batches instead of restricting the input schemas to have the same nullability, as the latter could prevent some queries from being executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion CLI v45.0.0
> create table t1(a int not null) as values(1);
0 row(s) fetched.
Elapsed 0.012 seconds.

> create table t2(a int) as values(null);
0 row(s) fetched.
Elapsed 0.003 seconds.

> select * from t1 union all select * from t2;
+------+
| a    |
+------+
| NULL |
| 1    |
+------+
2 row(s) fetched.
Elapsed 0.004 seconds.

Thanks @jonahgao , got it now, we may need to make the output schema unified if we have nullable field for any input.

For example, above case, we need to make sure the schema is nullable for output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, it seems several union issues are also related the nullable for union:
#14352

_plan_err!(
"Inserting query must have the same schema nullability as the table. \
Expected table field '{}' nullability: {}, got field: '{}', nullability: {}",
f1.name(),
f1.is_nullable(),
f2.name(),
f2.is_nullable())
} else if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
_plan_err!(
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the function name, it seems that we haven't restricted it to only be used for insertion🤔.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the function only be called by insert into cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could update the comments to reflect this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in latest PR, thanks all.

but got '{}' with type {:?}.",
f1.name(),
f1.data_type(),
f2.name(),
f2.data_type())
} else {
Ok(())
}
})
}

self.fields()
.iter()
.zip(other.fields().iter())
.all(|(f1, f2)| {
f1.name() == f2.name()
&& DFSchema::datatype_is_logically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
}

Expand Down
23 changes: 2 additions & 21 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,27 +996,8 @@ impl TableProvider for ListingTable {
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
// Return an error if schema of the input query does not match with the table schema.
return plan_err!(
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;

let table_path = &self.table_paths()[0];
if !table_path.is_collection() {
Expand Down
23 changes: 3 additions & 20 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,9 @@ impl TableProvider for MemTable {

// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;

if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
Expand Down
75 changes: 75 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5274,3 +5274,78 @@ async fn register_non_parquet_file() {
"1.json' does not match the expected extension '.parquet'"
);
}

// Test inserting into checking.
#[tokio::test]
async fn test_insert_into_checking() -> Result<()> {
// Create a new schema with one field called "a" of type Int64, and setting nullable to false
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));

let session_ctx = SessionContext::new();

// Create and register the initial table with the provided schema and data
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
session_ctx.register_table("t", initial_table.clone())?;

// There are three cases we need to check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// There are three cases we need to check
// There are two cases we need to check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Addressed in latest

// 1. The len of the schema of the plan and the schema of the table should be the same
// 2. The nullable flag of the schema of the plan and the schema of the table should be the same
// 3. The datatype of the schema of the plan and the schema of the table should be the same

// Test case 1:
let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();

match write_df
.write_table("t", DataFrameWriteOptions::new())
.await
{
Ok(_) => {}
Err(e) => {
assert_contains!(
e.to_string(),
"Inserting query must have the same schema length as the table."
);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can write this much more concisely using unwrap_err

Suggested change
match write_df
.write_table("t", DataFrameWriteOptions::new())
.await
{
Ok(_) => {}
Err(e) => {
assert_contains!(
e.to_string(),
"Inserting query must have the same schema length as the table."
);
}
}
let e = write_df
.write_table("t", DataFrameWriteOptions::new())
.await
.unwrap_err();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment applies to the code below as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, thanks @alamb , addressed in latest PR.


// Test case 2:
let write_df = session_ctx.sql("values (null), (12)").await.unwrap();

match write_df
.write_table("t", DataFrameWriteOptions::new())
.await
{
Ok(_) => {}
Err(e) => {
assert_contains!(
e.to_string(),
"Inserting query must have the same schema nullability as the table."
);
}
}

// Setting nullable to true
// Make sure the nullable check go through
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));

let session_ctx = SessionContext::new();

// Create and register the initial table with the provided schema and data
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
session_ctx.register_table("t", initial_table.clone())?;

// Test case 3:
let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();

match write_df
.write_table("t", DataFrameWriteOptions::new())
.await
{
Ok(_) => {}
Err(e) => {
assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected table field 'a' with type Int64, but got 'column1' with type Utf8");
}
}

Ok(())
}
29 changes: 28 additions & 1 deletion datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ set datafusion.execution.batch_size = 4;

# Inserting into nullable table with batch_size specified above
# to prevent creation on single in-memory batch

# Test error case: the error case is expected here, as the table c5 is not nullable
# and the query tries to insert a nullable value
statement ok
CREATE TABLE aggregate_test_100_null (
c2 TINYINT NOT NULL,
Expand All @@ -228,6 +231,30 @@ CREATE TABLE aggregate_test_100_null (
c11 FLOAT
);

statement error
INSERT INTO aggregate_test_100_null
SELECT
c2,
c5,
CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3,
CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11
FROM aggregate_test_100;
----
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'c5' nullability: false, got field: 'c5', nullability: true


statement ok
drop table aggregate_test_100_null;

# Test successful insert case which c5 is nullable
statement ok
CREATE TABLE aggregate_test_100_null (
c2 TINYINT NOT NULL,
c5 INT,
Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, i also add the successful case which the table field c5 is nullable.

c3 SMALLINT,
c11 FLOAT
);

statement ok
INSERT INTO aggregate_test_100_null
SELECT
Expand Down Expand Up @@ -506,7 +533,7 @@ SELECT
avg(c11) FILTER (WHERE c2 != 5)
FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
----
a 2.5 0.449071887467
a 2.5 0.449071887467
b 2.642857142857 0.445486298629
c 2.421052631579 0.422882117723
d 2.125 0.518706191331
Expand Down
14 changes: 10 additions & 4 deletions datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,11 @@ insert into table_without_values(field1) values(3);
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error
insert into table_without_values(field2) values(300);
----
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to we have checking in insert into plan now, before this we only have the check for execution plan check.

DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'field1' nullability: false, got field: 'field1', nullability: true


statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);
Expand Down Expand Up @@ -358,7 +361,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
c int default 100*2+300,
c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
)
Expand All @@ -368,8 +371,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
----
1

statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
statement error
insert into test_column_defaults(a) values(2)
----
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'b' nullability: false, got field: 'b', nullability: true


query I
insert into test_column_defaults(b) values(20)
Expand Down Expand Up @@ -412,7 +418,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
c int default 100*2+300,
c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) as values(1, 10, 100, 'ABC', now())
Expand Down
27 changes: 16 additions & 11 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b);

query I
#query error here because PARTITIONED BY (b) will make the b nullable to false
query error
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
----
2
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'b' nullability: false, got field: 'b', nullability: true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also expected, because PARTITIONED BY (b), will make the b nullable to false.

We shouldn't support insert nullable value for partition key i think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error should be after query error to pass CI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because PARTITIONED BY (b), will make the b nullable to false can be add as comment

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in latest PR, thanks! @jayzhan211

Error should be after query error to pass CI, i think it's auto generated after the PR here:

#14439

For example:

https://github.com/apache/datafusion/pull/14439/files#diff-51757b2b1d0a07b88551d88eabeba7f74e11b5217e44203ac7c6f613c0221196R273

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is auto generated a long time before, we need to manually move it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why CI is green 🤔

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayzhan211 This https://github.com/apache/datafusion/pull/14439/files#diff-51757b2b1d0a07b88551d88eabeba7f74e11b5217e44203ac7c6f613c0221196R273 merged less than a week ago. I think it's start from there, and i tried local sql logic test use -- --complete, it's also the same result, i guess the CI also use it to generate and verify?

We may need a follow-up issue to investigate it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



query TT
select * from dictionary_encoded_parquet_partitioned order by (a);
----
a foo
b bar

statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
Expand All @@ -81,11 +81,13 @@ STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
PARTITIONED BY (b);

query I
#query error here because PARTITIONED BY (b) will make the b nullable to false
query error
insert into dictionary_encoded_arrow_partitioned
select * from dictionary_encoded_values
----
2
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'b' nullability: false, got field: 'b', nullability: true


statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
Expand All @@ -97,13 +99,10 @@ LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/';
query T
select * from dictionary_encoded_arrow_test_readback;
----
b

query TT
select * from dictionary_encoded_arrow_partitioned order by (a);
----
a foo
b bar


# test_insert_into
Expand Down Expand Up @@ -543,8 +542,11 @@ insert into table_without_values(field1) values(3);
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error
insert into table_without_values(field2) values(300);
----
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'field1' nullability: false, got field: 'field1', nullability: true


statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);
Expand Down Expand Up @@ -581,8 +583,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
----
1

statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
statement error
insert into test_column_defaults(a) values(2)
----
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'b' nullability: false, got field: 'b', nullability: true


query I
insert into test_column_defaults(b) values(20)
Expand Down