Skip to content

bug: improve schema checking for insert into cases #14572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,12 +1002,14 @@ pub trait SchemaExt {
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;

/// Returns true if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns false otherwise.
/// Returns nothing if the two schemas have the same qualified named
/// fields with logically equivalent data types. Returns internal error otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
///
/// It is only used by insert into cases.
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
}

impl SchemaExt for Schema {
Expand All @@ -1028,21 +1030,36 @@ impl SchemaExt for Schema {
})
}

fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
// It is only used by insert into cases.
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
// case 1 : schema length mismatch
if self.fields().len() != other.fields().len() {
return false;
_plan_err!(
"Inserting query must have the same schema length as the table. \
Expected table schema length: {}, got: {}",
self.fields().len(),
other.fields().len()
)
} else {
// case 2 : schema length match, but fields mismatch
// check if the fields name are the same and have the same data types
self.fields()
.iter()
.zip(other.fields().iter())
.try_for_each(|(f1, f2)| {
if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
_plan_err!(
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the function name, it seems that we haven't restricted it to only be used for insertion🤔.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the function only be called by insert into cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could update the comments to reflect this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in latest PR, thanks all.

but got '{}' with type {:?}.",
f1.name(),
f1.data_type(),
f2.name(),
f2.data_type())
} else {
Ok(())
}
})
}

self.fields()
.iter()
.zip(other.fields().iter())
.all(|(f1, f2)| {
f1.name() == f2.name()
&& DFSchema::datatype_is_logically_equal(
f1.data_type(),
f2.data_type(),
)
})
}
}

Expand Down
23 changes: 2 additions & 21 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,27 +996,8 @@ impl TableProvider for ListingTable {
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
// Return an error if schema of the input query does not match with the table schema.
return plan_err!(
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;

let table_path = &self.table_paths()[0];
if !table_path.is_collection() {
Expand Down
23 changes: 3 additions & 20 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,9 @@ impl TableProvider for MemTable {

// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
if !self
.schema()
.logically_equivalent_names_and_types(&input.schema())
{
return plan_err!(
"Inserting query must have the same schema with the table. \
Expected: {:?}, got: {:?}",
self.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>(),
input
.schema()
.fields()
.iter()
.map(|field| field.data_type())
.collect::<Vec<_>>()
);
}
self.schema()
.logically_equivalent_names_and_types(&input.schema())?;

if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
Expand Down
52 changes: 52 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5274,3 +5274,55 @@ async fn register_non_parquet_file() {
"1.json' does not match the expected extension '.parquet'"
);
}

// Test inserting into checking.
#[tokio::test]
async fn test_insert_into_checking() -> Result<()> {
// Create a new schema with one field called "a" of type Int64, and setting nullable to false
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));

let session_ctx = SessionContext::new();

// Create and register the initial table with the provided schema and data
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
session_ctx.register_table("t", initial_table.clone())?;

// There are two cases we need to check
// 1. The len of the schema of the plan and the schema of the table should be the same
// 2. The datatype of the schema of the plan and the schema of the table should be the same

// Test case 1:
let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();

let e = write_df
.write_table("t", DataFrameWriteOptions::new())
.await
.unwrap_err();

assert_contains!(
e.to_string(),
"Inserting query must have the same schema length as the table."
);

// Setting nullable to true
// Make sure the nullable check go through
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));

let session_ctx = SessionContext::new();

// Create and register the initial table with the provided schema and data
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
session_ctx.register_table("t", initial_table.clone())?;

// Test case 2:
let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();

let e = write_df
.write_table("t", DataFrameWriteOptions::new())
.await
.unwrap_err();

assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected table field 'a' with type Int64, but got 'column1' with type Utf8");

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ set datafusion.execution.batch_size = 4;

# Inserting into nullable table with batch_size specified above
# to prevent creation on single in-memory batch

statement ok
CREATE TABLE aggregate_test_100_null (
c2 TINYINT NOT NULL,
Expand Down Expand Up @@ -506,7 +507,7 @@ SELECT
avg(c11) FILTER (WHERE c2 != 5)
FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
----
a 2.5 0.449071887467
a 2.5 0.449071887467
b 2.642857142857 0.445486298629
c 2.421052631579 0.422882117723
d 2.125 0.518706191331
Expand Down
14 changes: 10 additions & 4 deletions datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,11 @@ insert into table_without_values(field1) values(3);
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error
insert into table_without_values(field2) values(300);
----
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to we have checking in insert into plan now, before this we only have the check for execution plan check.

DataFusion error: Execution error: Invalid batch column at '0' has null but schema specifies non-nullable


statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);
Expand Down Expand Up @@ -358,7 +361,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
c int default 100*2+300,
c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
)
Expand All @@ -368,8 +371,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
----
1

statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
statement error
insert into test_column_defaults(a) values(2)
----
DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable


query I
insert into test_column_defaults(b) values(20)
Expand Down Expand Up @@ -412,7 +418,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
c int default 100*2+300,
c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) as values(1, 10, 100, 'ABC', now())
Expand Down
12 changes: 10 additions & 2 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b);

#query error here because PARTITIONED BY (b) will make the b nullable to false
query I
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
Expand All @@ -81,6 +82,7 @@ STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
PARTITIONED BY (b);

#query error here because PARTITIONED BY (b) will make the b nullable to false
query I
insert into dictionary_encoded_arrow_partitioned
select * from dictionary_encoded_values
Expand Down Expand Up @@ -543,8 +545,11 @@ insert into table_without_values(field1) values(3);
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
statement error
insert into table_without_values(field2) values(300);
----
DataFusion error: Execution error: Invalid batch column at '0' has null but schema specifies non-nullable


statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
insert into table_without_values values(NULL, 300);
Expand Down Expand Up @@ -581,8 +586,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', now())
----
1

statement error DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable
statement error
insert into test_column_defaults(a) values(2)
----
DataFusion error: Execution error: Invalid batch column at '1' has null but schema specifies non-nullable


query I
insert into test_column_defaults(b) values(20)
Expand Down
Loading