Skip to content

Commit a0a063d

Browse files
authored
Perf: Support Utf8View datatype single column comparisons for SortPreservingMergeStream (#15348)
* Perf: Support Utf8View datatype single column comparisons for SortPreservingMergeStream * Add safety and bench sql * fix * Fix * Add benchmark testing
1 parent 0b76ac8 commit a0a063d

File tree

4 files changed

+172
-5
lines changed

4 files changed

+172
-5
lines changed

benchmarks/src/sort_tpch.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl RunOpt {
9292
/// Payload Columns:
9393
/// - Thin variant: `l_partkey` column with `BIGINT` type (1 column)
9494
/// - Wide variant: all columns except for possible key columns (12 columns)
95-
const SORT_QUERIES: [&'static str; 10] = [
95+
const SORT_QUERIES: [&'static str; 11] = [
9696
// Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column
9797
r#"
9898
SELECT l_linenumber, l_partkey
@@ -159,6 +159,12 @@ impl RunOpt {
159159
FROM lineitem
160160
ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment
161161
"#,
162+
// Q11: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column
163+
r#"
164+
SELECT l_shipmode, l_comment, l_partkey
165+
FROM lineitem
166+
ORDER BY l_shipmode;
167+
"#,
162168
];
163169

164170
/// If query is specified from command line, run only that query.

datafusion/core/benches/sort.rs

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@
6868
6969
use std::sync::Arc;
7070

71+
use arrow::array::StringViewArray;
7172
use arrow::{
7273
array::{DictionaryArray, Float64Array, Int64Array, StringArray},
7374
compute::SortOptions,
7475
datatypes::{Int32Type, Schema},
7576
record_batch::RecordBatch,
7677
};
77-
7878
use datafusion::physical_plan::sorts::sort::SortExec;
7979
use datafusion::{
8080
execution::context::TaskContext,
@@ -114,11 +114,24 @@ fn criterion_benchmark(c: &mut Criterion) {
114114
("f64", &f64_streams),
115115
("utf8 low cardinality", &utf8_low_cardinality_streams),
116116
("utf8 high cardinality", &utf8_high_cardinality_streams),
117+
(
118+
"utf8 view low cardinality",
119+
&utf8_view_low_cardinality_streams,
120+
),
121+
(
122+
"utf8 view high cardinality",
123+
&utf8_view_high_cardinality_streams,
124+
),
117125
("utf8 tuple", &utf8_tuple_streams),
126+
("utf8 view tuple", &utf8_view_tuple_streams),
118127
("utf8 dictionary", &dictionary_streams),
119128
("utf8 dictionary tuple", &dictionary_tuple_streams),
120129
("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
121130
("mixed tuple", &mixed_tuple_streams),
131+
(
132+
"mixed tuple with utf8 view",
133+
&mixed_tuple_with_utf8_view_streams,
134+
),
122135
];
123136

124137
for (name, f) in cases {
@@ -308,6 +321,30 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
308321
})
309322
}
310323

324+
/// Create streams of random low cardinality utf8_view values
325+
fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
326+
let mut values = DataGenerator::new().utf8_low_cardinality_values();
327+
if sorted {
328+
values.sort_unstable();
329+
}
330+
split_tuples(values, |v| {
331+
let array: StringViewArray = v.into_iter().collect();
332+
RecordBatch::try_from_iter(vec![("utf_view_low", Arc::new(array) as _)]).unwrap()
333+
})
334+
}
335+
336+
/// Create streams of high cardinality (~ no duplicates) utf8_view values
337+
fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
338+
let mut values = DataGenerator::new().utf8_high_cardinality_values();
339+
if sorted {
340+
values.sort_unstable();
341+
}
342+
split_tuples(values, |v| {
343+
let array: StringViewArray = v.into_iter().collect();
344+
RecordBatch::try_from_iter(vec![("utf_view_high", Arc::new(array) as _)]).unwrap()
345+
})
346+
}
347+
311348
/// Create streams of high cardinality (~ no duplicates) utf8 values
312349
fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
313350
let mut values = DataGenerator::new().utf8_high_cardinality_values();
@@ -353,6 +390,39 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
353390
})
354391
}
355392

393+
/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
394+
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
395+
let mut gen = DataGenerator::new();
396+
397+
// need to sort by the combined key, so combine them together
398+
let mut tuples: Vec<_> = gen
399+
.utf8_low_cardinality_values()
400+
.into_iter()
401+
.zip(gen.utf8_low_cardinality_values())
402+
.zip(gen.utf8_high_cardinality_values())
403+
.collect();
404+
405+
if sorted {
406+
tuples.sort_unstable();
407+
}
408+
409+
split_tuples(tuples, |tuples| {
410+
let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
411+
let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
412+
413+
let utf8_view_high: StringViewArray = utf8_high.into_iter().collect();
414+
let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
415+
let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
416+
417+
RecordBatch::try_from_iter(vec![
418+
("utf_view_low1", Arc::new(utf8_view_low1) as _),
419+
("utf_view_low2", Arc::new(utf8_view_low2) as _),
420+
("utf_view_high", Arc::new(utf8_view_high) as _),
421+
])
422+
.unwrap()
423+
})
424+
}
425+
356426
/// Create a batch of (f64, utf8_low, utf8_low, i64)
357427
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
358428
let mut gen = DataGenerator::new();
@@ -391,6 +461,44 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
391461
})
392462
}
393463

464+
/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
465+
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
466+
let mut gen = DataGenerator::new();
467+
468+
// need to sort by the combined key, so combine them together
469+
let mut tuples: Vec<_> = gen
470+
.i64_values()
471+
.into_iter()
472+
.zip(gen.utf8_low_cardinality_values())
473+
.zip(gen.utf8_low_cardinality_values())
474+
.zip(gen.i64_values())
475+
.collect();
476+
477+
if sorted {
478+
tuples.sort_unstable();
479+
}
480+
481+
split_tuples(tuples, |tuples| {
482+
let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
483+
let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
484+
let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
485+
486+
let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect();
487+
488+
let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect();
489+
let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect();
490+
let i64_values: Int64Array = i64_values.into_iter().collect();
491+
492+
RecordBatch::try_from_iter(vec![
493+
("f64", Arc::new(f64_values) as _),
494+
("utf_view_low1", Arc::new(utf8_view_low1) as _),
495+
("utf_view_low2", Arc::new(utf8_view_low2) as _),
496+
("i64", Arc::new(i64_values) as _),
497+
])
498+
.unwrap()
499+
})
500+
}
501+
394502
/// Create a batch of (utf8_dict)
395503
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
396504
let mut gen = DataGenerator::new();
@@ -402,7 +510,6 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {
402510
split_tuples(values, |v| {
403511
let dictionary: DictionaryArray<Int32Type> =
404512
v.iter().map(Option::as_deref).collect();
405-
406513
RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap()
407514
})
408515
}

datafusion/physical-plan/src/sorts/cursor.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::cmp::Ordering;
1919

2020
use arrow::array::{
21-
types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait,
22-
PrimitiveArray,
21+
types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray,
22+
GenericByteViewArray, OffsetSizeTrait, PrimitiveArray, StringViewArray,
2323
};
2424
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
2525
use arrow::compute::SortOptions;
@@ -281,6 +281,59 @@ impl<T: ByteArrayType> CursorArray for GenericByteArray<T> {
281281
}
282282
}
283283

284+
impl CursorArray for StringViewArray {
285+
type Values = StringViewArray;
286+
fn values(&self) -> Self {
287+
self.clone()
288+
}
289+
}
290+
291+
impl CursorValues for StringViewArray {
292+
fn len(&self) -> usize {
293+
self.views().len()
294+
}
295+
296+
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
297+
// SAFETY: Both l_idx and r_idx are guaranteed to be within bounds,
298+
// and any null-checks are handled in the outer layers.
299+
// Fast path: Compare the lengths before full byte comparison.
300+
301+
let l_view = unsafe { l.views().get_unchecked(l_idx) };
302+
let l_len = *l_view as u32;
303+
let r_view = unsafe { r.views().get_unchecked(r_idx) };
304+
let r_len = *r_view as u32;
305+
if l_len != r_len {
306+
return false;
307+
}
308+
309+
unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx).is_eq() }
310+
}
311+
312+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
313+
// SAFETY: The caller guarantees that idx > 0 and the indices are valid.
314+
// Already checked it in is_eq_to_prev_one function
315+
// Fast path: Compare the lengths of the current and previous views.
316+
let l_view = unsafe { cursor.views().get_unchecked(idx) };
317+
let l_len = *l_view as u32;
318+
let r_view = unsafe { cursor.views().get_unchecked(idx - 1) };
319+
let r_len = *r_view as u32;
320+
if l_len != r_len {
321+
return false;
322+
}
323+
324+
unsafe {
325+
GenericByteViewArray::compare_unchecked(cursor, idx, cursor, idx - 1).is_eq()
326+
}
327+
}
328+
329+
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
330+
// SAFETY: Prior assertions guarantee that l_idx and r_idx are valid indices.
331+
// Null-checks are assumed to have been handled in the wrapper (e.g., ArrayValues).
332+
// And the bound is checked in is_finished, it is safe to call get_unchecked
333+
unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx) }
334+
}
335+
}
336+
284337
/// A collection of sorted, nullable [`CursorValues`]
285338
///
286339
/// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering

datafusion/physical-plan/src/sorts/streaming_merge.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl<'a> StreamingMergeBuilder<'a> {
177177
downcast_primitive! {
178178
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
179179
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
180+
DataType::Utf8View => merge_helper!(StringViewArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
180181
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
181182
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
182183
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)

0 commit comments

Comments
 (0)