Skip to content

POC Varchar default mapping to utf8view #16142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ config_namespace! {
/// If true, `VARCHAR` is mapped to `Utf8View` during SQL planning.
/// If false, `VARCHAR` is mapped to `Utf8` during SQL planning.
/// Default is false.
pub map_varchar_to_utf8view: bool, default = false
pub map_varchar_to_utf8view: bool, default = true

/// When set to true, the source locations relative to the original SQL
/// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2504,6 +2504,11 @@ async fn write_table_with_order() -> Result<()> {
write_df = write_df
.with_column_renamed("column1", "tablecol1")
.unwrap();

// Ensure the column type matches the target table
write_df =
write_df.with_column("tablecol1", cast(col("tablecol1"), DataType::Utf8View))?;

let sql_str =
"create external table data(tablecol1 varchar) stored as parquet location '"
.to_owned()
Expand Down
48 changes: 24 additions & 24 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ async fn csv_explain_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
" TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -222,11 +222,11 @@ async fn csv_explain_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand All @@ -250,9 +250,9 @@ async fn csv_explain_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -296,11 +296,11 @@ async fn csv_explain_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down Expand Up @@ -398,9 +398,9 @@ async fn csv_explain_verbose_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" TableScan: aggregate_test_100 [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int64(10) [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
" TableScan: aggregate_test_100 [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]",
];
let formatted = dataframe.logical_plan().display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -444,11 +444,11 @@ async fn csv_explain_verbose_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int64(10)\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100\\nSchema: [c1:Utf8View, c2:Int8, c3:Int16, c4:Int16, c5:Int32, c6:Int64, c7:Int16, c8:Int32, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8View]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand All @@ -472,9 +472,9 @@ async fn csv_explain_verbose_plans() {
// Verify schema
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: aggregate_test_100.c1 [c1:Utf8]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8, c2:Int8]",
" Projection: aggregate_test_100.c1 [c1:Utf8View]",
" Filter: aggregate_test_100.c2 > Int8(10) [c1:Utf8View, c2:Int8]",
" TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)] [c1:Utf8View, c2:Int8]",
];
let formatted = plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down Expand Up @@ -518,11 +518,11 @@ async fn csv_explain_verbose_plans() {
" {",
" graph[label=\"Detailed LogicalPlan\"]",
" 7[shape=box label=\"Explain\\nSchema: [plan_type:Utf8, plan:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8]\"]",
" 8[shape=box label=\"Projection: aggregate_test_100.c1\\nSchema: [c1:Utf8View]\"]",
" 7 -> 8 [arrowhead=none, arrowtail=normal, dir=back]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 9[shape=box label=\"Filter: aggregate_test_100.c2 > Int8(10)\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8, c2:Int8]\"]",
" 10[shape=box label=\"TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]\\nSchema: [c1:Utf8View, c2:Int8]\"]",
" 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]",
" }",
"}",
Expand Down
41 changes: 31 additions & 10 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use std::hash::Hash;
use std::task::{Context, Poll};
use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};

use arrow::array::{Array, ArrayRef, StringViewArray};
use arrow::{
array::{Int64Array, StringArray},
datatypes::SchemaRef,
Expand Down Expand Up @@ -100,6 +101,7 @@ use datafusion_optimizer::AnalyzerRule;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use async_trait::async_trait;
use datafusion_common::cast::as_string_view_array;
use futures::{Stream, StreamExt};

/// Execute the specified sql and return the resulting record batches
Expand Down Expand Up @@ -796,22 +798,30 @@ fn accumulate_batch(
k: &usize,
) -> BTreeMap<i64, String> {
let num_rows = input_batch.num_rows();

// Assuming the input columns are
// column[0]: customer_id / UTF8
// column[0]: customer_id / UTF8 or UTF8View
// column[1]: revenue: Int64
let customer_id =
as_string_array(input_batch.column(0)).expect("Column 0 is not customer_id");

let customer_id_column = input_batch.column(0);
let revenue = as_int64_array(input_batch.column(1)).unwrap();

for row in 0..num_rows {
add_row(
&mut top_values,
customer_id.value(row),
revenue.value(row),
k,
);
let customer_id = match customer_id_column.data_type() {
arrow::datatypes::DataType::Utf8 => {
let array = as_string_array(customer_id_column).unwrap();
array.value(row)
}
arrow::datatypes::DataType::Utf8View => {
let array = as_string_view_array(customer_id_column).unwrap();
array.value(row)
}
_ => panic!("Unsupported customer_id type"),
};

add_row(&mut top_values, customer_id, revenue.value(row), k);
}

top_values
}

Expand Down Expand Up @@ -843,11 +853,22 @@ impl Stream for TopKReader {
self.state.iter().rev().unzip();

let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect();

let customer_array: ArrayRef = match schema.field(0).data_type() {
arrow::datatypes::DataType::Utf8 => {
Arc::new(StringArray::from(customer))
}
arrow::datatypes::DataType::Utf8View => {
Arc::new(StringViewArray::from(customer))
}
other => panic!("Unsupported customer_id output type: {other:?}"),
};

Poll::Ready(Some(
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(customer)),
Arc::new(customer_array),
Arc::new(Int64Array::from(revenue)),
],
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
map_varchar_to_utf8view: false,
map_varchar_to_utf8view: true,
enable_options_value_normalization: false,
collect_spans: false,
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3355,7 +3355,7 @@ fn parse_decimals_parser_options() -> ParserOptions {
parse_float_as_decimal: true,
enable_ident_normalization: false,
support_varchar_with_length: false,
map_varchar_to_utf8view: false,
map_varchar_to_utf8view: true,
enable_options_value_normalization: false,
collect_spans: false,
}
Expand All @@ -3366,7 +3366,7 @@ fn ident_normalization_parser_options_no_ident_normalization() -> ParserOptions
parse_float_as_decimal: true,
enable_ident_normalization: false,
support_varchar_with_length: false,
map_varchar_to_utf8view: false,
map_varchar_to_utf8view: true,
enable_options_value_normalization: false,
collect_spans: false,
}
Expand All @@ -3377,7 +3377,7 @@ fn ident_normalization_parser_options_ident_normalization() -> ParserOptions {
parse_float_as_decimal: true,
enable_ident_normalization: true,
support_varchar_with_length: false,
map_varchar_to_utf8view: false,
map_varchar_to_utf8view: true,
enable_options_value_normalization: false,
collect_spans: false,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ pub fn convert_schema_to_types(columns: &Fields) -> Vec<DFColumnType> {
if key_type.is_integer() {
// mapping dictionary string types to Text
match value_type.as_ref() {
DataType::Utf8 | DataType::LargeUtf8 => DFColumnType::Text,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
DFColumnType::Text
}
_ => DFColumnType::Another,
}
} else {
Expand Down
Loading
Loading