Skip to content

Commit 48bff75

Browse files
authored
Provide field and schema metadata missing on cross joins, and union with null fields. (#12729)
* test: reproducer for missing schema metadata on cross join * fix: pass thru schema metadata on cross join * fix: preserve metadata when transforming to view types * test: reproducer for missing field metadata in left hand NULL field of union * fix: preserve field metadata from right side of union * chore: safe indexing
1 parent d4bc1c1 commit 48bff75

File tree

5 files changed

+65
-16
lines changed

5 files changed

+65
-16
lines changed

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,14 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
241241
.fields
242242
.iter()
243243
.map(|field| match field.data_type() {
244-
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new(
245-
field.name(),
246-
DataType::Utf8View,
247-
field.is_nullable(),
248-
)),
249-
DataType::Binary | DataType::LargeBinary => Arc::new(Field::new(
250-
field.name(),
251-
DataType::BinaryView,
252-
field.is_nullable(),
253-
)),
244+
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
245+
Field::new(field.name(), DataType::Utf8View, field.is_nullable())
246+
.with_metadata(field.metadata().to_owned()),
247+
),
248+
DataType::Binary | DataType::LargeBinary => Arc::new(
249+
Field::new(field.name(), DataType::BinaryView, field.is_nullable())
250+
.with_metadata(field.metadata().to_owned()),
251+
),
254252
_ => field.clone(),
255253
})
256254
.collect();

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,22 @@ impl CrossJoinExec {
6969
/// Create a new [CrossJoinExec].
7070
pub fn new(left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>) -> Self {
7171
// left then right
72-
let all_columns: Fields = {
72+
let (all_columns, metadata) = {
7373
let left_schema = left.schema();
7474
let right_schema = right.schema();
7575
let left_fields = left_schema.fields().iter();
7676
let right_fields = right_schema.fields().iter();
77-
left_fields.chain(right_fields).cloned().collect()
77+
78+
let mut metadata = left_schema.metadata().clone();
79+
metadata.extend(right_schema.metadata().clone());
80+
81+
(
82+
left_fields.chain(right_fields).cloned().collect::<Fields>(),
83+
metadata,
84+
)
7885
};
7986

80-
let schema = Arc::new(Schema::new(all_columns));
87+
let schema = Arc::new(Schema::new(all_columns).with_metadata(metadata));
8188
let cache = Self::compute_properties(&left, &right, Arc::clone(&schema));
8289
CrossJoinExec {
8390
left,

datafusion/physical-plan/src/union.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,16 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
474474
.iter()
475475
.filter_map(|input| {
476476
if input.schema().fields().len() > i {
477-
Some(input.schema().field(i).clone())
477+
let field = input.schema().field(i).clone();
478+
let right_hand_metdata = inputs
479+
.get(1)
480+
.map(|right_input| {
481+
right_input.schema().field(i).metadata().clone()
482+
})
483+
.unwrap_or_default();
484+
let mut metadata = field.metadata().clone();
485+
metadata.extend(right_hand_metdata);
486+
Some(field.with_metadata(metadata))
478487
} else {
479488
None
480489
}

datafusion/sqllogictest/src/test_context.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,13 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
314314
String::from("metadata_key"),
315315
String::from("the name field"),
316316
)]));
317+
let l_name =
318+
Field::new("l_name", DataType::Utf8, true).with_metadata(HashMap::from([(
319+
String::from("metadata_key"),
320+
String::from("the l_name field"),
321+
)]));
317322

318-
let schema = Schema::new(vec![id, name]).with_metadata(HashMap::from([(
323+
let schema = Schema::new(vec![id, name, l_name]).with_metadata(HashMap::from([(
319324
String::from("metadata_key"),
320325
String::from("the entire schema"),
321326
)]));
@@ -325,6 +330,7 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
325330
vec![
326331
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _,
327332
Arc::new(StringArray::from(vec![None, Some("bar"), Some("baz")])) as _,
333+
Arc::new(StringArray::from(vec![None, Some("l_bar"), Some("l_baz")])) as _,
328334
],
329335
)
330336
.unwrap();

datafusion/sqllogictest/test_files/metadata.slt

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
## with metadata in SQL.
2626

2727
query IT
28-
select * from table_with_metadata;
28+
select id, name from table_with_metadata;
2929
----
3030
1 NULL
3131
NULL bar
@@ -96,5 +96,34 @@ select count(id) cnt from table_with_metadata group by name order by cnt;
9696
1
9797

9898

99+
100+
# Regression test: missing schema metadata, when aggregate on cross join
101+
query I
102+
SELECT count("data"."id")
103+
FROM
104+
(
105+
SELECT "id" FROM "table_with_metadata"
106+
) as "data",
107+
(
108+
SELECT "id" FROM "table_with_metadata"
109+
) as "samples";
110+
----
111+
6
112+
113+
# Regression test: missing field metadata, from the NULL field on the left side of the union
114+
query ITT
115+
(SELECT id, NULL::string as name, l_name FROM "table_with_metadata")
116+
UNION
117+
(SELECT id, name, NULL::string as l_name FROM "table_with_metadata")
118+
ORDER BY id, name, l_name;
119+
----
120+
1 NULL NULL
121+
3 baz NULL
122+
3 NULL l_baz
123+
NULL bar NULL
124+
NULL NULL l_bar
125+
126+
127+
99128
statement ok
100129
drop table table_with_metadata;

0 commit comments

Comments
 (0)