Skip to content

Commit 4f6062e

Browse files
rkrishn7alamb
andauthored
fix: Capture nullability in Values node planning (#14472)
* fix: Capture nullability in `Values` node planning * fix: Clippy errors * fix: Insert plan tests (push down casts to values node) * fix: Insert schema error messages * refactor: create values fields container --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent ab9fc20 commit 4f6062e

File tree

5 files changed

+51
-30
lines changed

5 files changed

+51
-30
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,10 @@ impl LogicalPlanBuilder {
242242
schema: &DFSchema,
243243
) -> Result<Self> {
244244
let n_cols = values[0].len();
245-
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
245+
let mut fields = ValuesFields::new();
246246
for j in 0..n_cols {
247247
let field_type = schema.field(j).data_type();
248+
let field_nullable = schema.field(j).is_nullable();
248249
for row in values.iter() {
249250
let value = &row[j];
250251
let data_type = value.get_type(schema)?;
@@ -260,17 +261,17 @@ impl LogicalPlanBuilder {
260261
}
261262
}
262263
}
263-
field_types.push(field_type.to_owned());
264+
fields.push(field_type.to_owned(), field_nullable);
264265
}
265266

266-
Self::infer_inner(values, &field_types, schema)
267+
Self::infer_inner(values, fields, schema)
267268
}
268269

269270
fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
270271
let n_cols = values[0].len();
271272
let schema = DFSchema::empty();
273+
let mut fields = ValuesFields::new();
272274

273-
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
274275
for j in 0..n_cols {
275276
let mut common_type: Option<DataType> = None;
276277
for (i, row) in values.iter().enumerate() {
@@ -293,37 +294,30 @@ impl LogicalPlanBuilder {
293294
}
294295
// assuming common_type was not set, and no error, therefore the type should be NULL
295296
// since the code loop skips NULL
296-
field_types.push(common_type.unwrap_or(DataType::Null));
297+
fields.push(common_type.unwrap_or(DataType::Null), true);
297298
}
298299

299-
Self::infer_inner(values, &field_types, &schema)
300+
Self::infer_inner(values, fields, &schema)
300301
}
301302

302303
fn infer_inner(
303304
mut values: Vec<Vec<Expr>>,
304-
field_types: &[DataType],
305+
fields: ValuesFields,
305306
schema: &DFSchema,
306307
) -> Result<Self> {
308+
let fields = fields.into_fields();
307309
// wrap cast if data type is not same as common type.
308310
for row in &mut values {
309-
for (j, field_type) in field_types.iter().enumerate() {
311+
for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
310312
if let Expr::Literal(ScalarValue::Null) = row[j] {
311313
row[j] = Expr::Literal(ScalarValue::try_from(field_type)?);
312314
} else {
313315
row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
314316
}
315317
}
316318
}
317-
let fields = field_types
318-
.iter()
319-
.enumerate()
320-
.map(|(j, data_type)| {
321-
// naming is following convention https://www.postgresql.org/docs/current/queries-values.html
322-
let name = &format!("column{}", j + 1);
323-
Field::new(name, data_type.clone(), true)
324-
})
325-
.collect::<Vec<_>>();
326-
let dfschema = DFSchema::from_unqualified_fields(fields.into(), HashMap::new())?;
319+
320+
let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
327321
let schema = DFSchemaRef::new(dfschema);
328322

329323
Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
@@ -1320,6 +1314,29 @@ impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
13201314
}
13211315
}
13221316

1317+
/// Container used when building fields for a `VALUES` node.
1318+
#[derive(Default)]
1319+
struct ValuesFields {
1320+
inner: Vec<Field>,
1321+
}
1322+
1323+
impl ValuesFields {
1324+
pub fn new() -> Self {
1325+
Self::default()
1326+
}
1327+
1328+
pub fn push(&mut self, data_type: DataType, nullable: bool) {
1329+
// Naming follows the convention described here:
1330+
// https://www.postgresql.org/docs/current/queries-values.html
1331+
let name = format!("column{}", self.inner.len() + 1);
1332+
self.inner.push(Field::new(name, data_type, nullable));
1333+
}
1334+
1335+
pub fn into_fields(self) -> Fields {
1336+
self.inner.into()
1337+
}
1338+
}
1339+
13231340
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
13241341
let mut name_map = HashMap::new();
13251342
fields

datafusion/sql/src/statement.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1897,6 +1897,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
18971897
let column_index = table_schema
18981898
.index_of_column_by_name(None, &c)
18991899
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
1900+
19001901
if value_indices[column_index].is_some() {
19011902
return schema_err!(SchemaError::DuplicateUnqualifiedField {
19021903
name: c,
@@ -1937,6 +1938,9 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
19371938
// Projection
19381939
let mut planner_context =
19391940
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
1941+
planner_context.set_table_schema(Some(DFSchemaRef::new(
1942+
DFSchema::from_unqualified_fields(fields.clone(), Default::default())?,
1943+
)));
19401944
let source = self.query_to_plan(*source, &mut planner_context)?;
19411945
if fields.len() != source.schema().fields().len() {
19421946
plan_err!("Column count doesn't match insert query!")?;

datafusion/sql/tests/sql_integration.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -468,10 +468,10 @@ fn plan_insert() {
468468
let sql =
469469
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
470470
let plan = "Dml: op=[Insert Into] table=[person]\
471-
\n Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name, \
471+
\n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \
472472
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \
473473
CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\
474-
\n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
474+
\n Values: (CAST(Int64(1) AS UInt32), Utf8(\"Alan\"), Utf8(\"Turing\"))";
475475
quick_test(sql, plan);
476476
}
477477

@@ -480,8 +480,8 @@ fn plan_insert_no_target_columns() {
480480
let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
481481
let plan = r#"
482482
Dml: op=[Insert Into] table=[test_decimal]
483-
Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
484-
Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
483+
Projection: column1 AS id, column2 AS price
484+
Values: (CAST(Int64(1) AS Int32), CAST(Int64(2) AS Decimal128(10, 2))), (CAST(Int64(3) AS Int32), CAST(Int64(4) AS Decimal128(10, 2)))
485485
"#
486486
.trim();
487487
quick_test(sql, plan);
@@ -499,11 +499,11 @@ Dml: op=[Insert Into] table=[test_decimal]
499499
)]
500500
#[case::target_column_count_mismatch(
501501
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
502-
"Error during planning: Column count doesn't match insert query!"
502+
"Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 3"
503503
)]
504504
#[case::source_column_count_mismatch(
505505
"INSERT INTO person VALUES ($1, $2)",
506-
"Error during planning: Column count doesn't match insert query!"
506+
"Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 8"
507507
)]
508508
#[case::extra_placeholder(
509509
"INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",

datafusion/sqllogictest/test_files/insert.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ insert into table_without_values(id, id) values(3, 3);
255255
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of Int64 type
256256
insert into table_without_values(name, id) values(4, 'zoo');
257257

258-
statement error Error during planning: Column count doesn't match insert query!
258+
statement error Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 1
259259
insert into table_without_values(id) values(4, 'zoo');
260260

261261
# insert NULL values for the missing column (name)
@@ -299,10 +299,10 @@ insert into table_without_values(field1) values(3);
299299
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
300300
insert into table_without_values(field2) values(300);
301301

302-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
302+
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
303303
insert into table_without_values values(NULL, 300);
304304

305-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
305+
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
306306
insert into table_without_values values(3, 300), (NULL, 400);
307307

308308
query II rowsort

datafusion/sqllogictest/test_files/insert_to_external.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ insert into table_without_values(id, id) values(3, 3);
500500
statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of Int64 type
501501
insert into table_without_values(name, id) values(4, 'zoo');
502502

503-
statement error Error during planning: Column count doesn't match insert query!
503+
statement error Error during planning: Inconsistent data length across values list: got 2 values in row 0 but expected 1
504504
insert into table_without_values(id) values(4, 'zoo');
505505

506506
# insert NULL values for the missing column (name)
@@ -546,10 +546,10 @@ insert into table_without_values(field1) values(3);
546546
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
547547
insert into table_without_values(field2) values(300);
548548

549-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
549+
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
550550
insert into table_without_values values(NULL, 300);
551551

552-
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
552+
statement error Invalid argument error: Column 'column1' is declared as non-nullable but contains null values
553553
insert into table_without_values values(3, 300), (NULL, 400);
554554

555555
query II rowsort

0 commit comments

Comments
 (0)