Skip to content

Commit ae0a57b

Browse files
jayzhan211berkaysynnada
authored andcommitted
Round robin polling between tied winners in sort preserving merge (apache#13133)
* first draft Signed-off-by: jayzhan211 <[email protected]> * add data Signed-off-by: jayzhan211 <[email protected]> * fix benchmark Signed-off-by: jayzhan211 <[email protected]> * add more bencmark data Signed-off-by: jayzhan211 <[email protected]> * fix benchmark Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * get max size Signed-off-by: jayzhan211 <[email protected]> * add license Signed-off-by: jayzhan211 <[email protected]> * rm code for merge Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * update poll count only we have tie Signed-off-by: jayzhan211 <[email protected]> * upd comment Signed-off-by: jayzhan211 <[email protected]> * fix logic Signed-off-by: jayzhan211 <[email protected]> * configurable Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * add mem limit test Signed-off-by: jayzhan211 <[email protected]> * rm test Signed-off-by: jayzhan211 <[email protected]> * escape bracket Signed-off-by: jayzhan211 <[email protected]> * add test Signed-off-by: jayzhan211 <[email protected]> * rm per consumer record Signed-off-by: jayzhan211 <[email protected]> * repartition limit Signed-off-by: jayzhan211 <[email protected]> * add benchmark Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * benchmark with parameter Signed-off-by: jayzhan211 <[email protected]> * only calculate consumer pool if the limit is set Signed-off-by: jayzhan211 <[email protected]> * combine eq and gt Signed-off-by: jayzhan211 <[email protected]> * review part 1 * Update merge.rs * upd doc Signed-off-by: jayzhan211 <[email protected]> * no need index comparison Signed-off-by: jayzhan211 <[email protected]> * combine handle tie and eq check Signed-off-by: jayzhan211 <[email protected]> * upd doc Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * add more comment Signed-off-by: jayzhan211 <[email protected]> * remove flag Signed-off-by: jayzhan211 <[email protected]> * upd comment Signed-off-by: jayzhan211 <[email protected]> * Revert "remove flag" This reverts commit 8d6c0a6. * Revert "upd comment" This reverts commit a18cba8. * add more comment Signed-off-by: jayzhan211 <[email protected]> * add more comment Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * simpliy mem pool Signed-off-by: jayzhan211 <[email protected]> * clippy Signed-off-by: jayzhan211 <[email protected]> * Update merge.rs * minor * add comment Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: berkaysynnada <[email protected]>
1 parent 50e1209 commit ae0a57b

File tree

6 files changed

+501
-24
lines changed

6 files changed

+501
-24
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ rand = { workspace = true }
6868
tokio = { workspace = true }
6969

7070
[dev-dependencies]
71+
criterion = { version = "0.5", features = ["async_futures"] }
7172
datafusion-functions-aggregate = { workspace = true }
7273
rstest = { workspace = true }
7374
rstest_reuse = "0.7.0"
@@ -76,3 +77,7 @@ tokio = { workspace = true, features = [
7677
"fs",
7778
"parking_lot",
7879
] }
80+
81+
[[bench]]
82+
harness = false
83+
name = "spm"
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow::record_batch::RecordBatch;
21+
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray};
22+
use datafusion_execution::TaskContext;
23+
use datafusion_physical_expr::expressions::col;
24+
use datafusion_physical_expr::PhysicalSortExpr;
25+
use datafusion_physical_plan::memory::MemoryExec;
26+
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27+
use datafusion_physical_plan::{collect, ExecutionPlan};
28+
29+
use criterion::async_executor::FuturesExecutor;
30+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
31+
32+
fn generate_spm_for_round_robin_tie_breaker(
33+
has_same_value: bool,
34+
enable_round_robin_repartition: bool,
35+
batch_count: usize,
36+
partition_count: usize,
37+
) -> SortPreservingMergeExec {
38+
let row_size = 256;
39+
let rb = if has_same_value {
40+
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
41+
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size]));
42+
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
43+
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
44+
} else {
45+
let v = (0i32..row_size as i32).collect::<Vec<_>>();
46+
let a: ArrayRef = Arc::new(Int32Array::from(v));
47+
48+
// Use alphanumeric characters
49+
let charset: Vec<char> =
50+
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
51+
.chars()
52+
.collect();
53+
54+
let mut strings = Vec::new();
55+
for i in 0..256 {
56+
let mut s = String::new();
57+
s.push(charset[i % charset.len()]);
58+
s.push(charset[(i / charset.len()) % charset.len()]);
59+
strings.push(Some(s));
60+
}
61+
62+
let b: ArrayRef = Arc::new(StringArray::from_iter(strings));
63+
64+
let v = (0i64..row_size as i64).collect::<Vec<_>>();
65+
let c: ArrayRef = Arc::new(Int64Array::from_iter(v));
66+
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
67+
};
68+
69+
let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>();
70+
let partitiones = vec![rbs.clone(); partition_count];
71+
72+
let schema = rb.schema();
73+
let sort = vec![
74+
PhysicalSortExpr {
75+
expr: col("b", &schema).unwrap(),
76+
options: Default::default(),
77+
},
78+
PhysicalSortExpr {
79+
expr: col("c", &schema).unwrap(),
80+
options: Default::default(),
81+
},
82+
];
83+
84+
let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap();
85+
SortPreservingMergeExec::new(sort, Arc::new(exec))
86+
.with_round_robin_repartition(enable_round_robin_repartition)
87+
}
88+
89+
fn run_bench(
90+
c: &mut Criterion,
91+
has_same_value: bool,
92+
enable_round_robin_repartition: bool,
93+
batch_count: usize,
94+
partition_count: usize,
95+
description: &str,
96+
) {
97+
let task_ctx = TaskContext::default();
98+
let task_ctx = Arc::new(task_ctx);
99+
100+
let spm = Arc::new(generate_spm_for_round_robin_tie_breaker(
101+
has_same_value,
102+
enable_round_robin_repartition,
103+
batch_count,
104+
partition_count,
105+
)) as Arc<dyn ExecutionPlan>;
106+
107+
c.bench_function(description, |b| {
108+
b.to_async(FuturesExecutor)
109+
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx))))
110+
});
111+
}
112+
113+
fn criterion_benchmark(c: &mut Criterion) {
114+
let params = [
115+
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker
116+
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker
117+
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker
118+
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker
119+
];
120+
121+
let batch_counts = [1, 25, 625];
122+
let partition_counts = [2, 8, 32];
123+
124+
for &(has_same_value, enable_round_robin_repartition, cardinality_label) in &params {
125+
for &batch_count in &batch_counts {
126+
for &partition_count in &partition_counts {
127+
let description = format!(
128+
"{}_batch_count_{}_partition_count_{}",
129+
cardinality_label, batch_count, partition_count
130+
);
131+
run_bench(
132+
c,
133+
has_same_value,
134+
enable_round_robin_repartition,
135+
batch_count,
136+
partition_count,
137+
&description,
138+
);
139+
}
140+
}
141+
}
142+
}
143+
144+
criterion_group!(benches, criterion_benchmark);
145+
criterion_main!(benches);

datafusion/physical-plan/src/sorts/cursor.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ pub trait CursorValues {
3838
/// Returns true if `l[l_idx] == r[r_idx]`
3939
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool;
4040

41+
/// Returns true if `row[idx] == row[idx - 1]`
42+
/// Given `idx` should be greater than 0
43+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool;
44+
4145
/// Returns comparison of `l[l_idx]` and `r[r_idx]`
4246
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
4347
}
@@ -95,6 +99,16 @@ impl<T: CursorValues> Cursor<T> {
9599
self.offset += 1;
96100
t
97101
}
102+
103+
pub fn is_eq_to_prev_one(&self, prev_cursor: Option<&Cursor<T>>) -> bool {
104+
if self.offset > 0 {
105+
self.is_eq_to_prev_row()
106+
} else if let Some(prev_cursor) = prev_cursor {
107+
self.is_eq_to_prev_row_in_prev_batch(prev_cursor)
108+
} else {
109+
false
110+
}
111+
}
98112
}
99113

100114
impl<T: CursorValues> PartialEq for Cursor<T> {
@@ -103,6 +117,22 @@ impl<T: CursorValues> PartialEq for Cursor<T> {
103117
}
104118
}
105119

120+
impl<T: CursorValues> Cursor<T> {
121+
fn is_eq_to_prev_row(&self) -> bool {
122+
T::eq_to_previous(&self.values, self.offset)
123+
}
124+
125+
fn is_eq_to_prev_row_in_prev_batch(&self, other: &Self) -> bool {
126+
assert_eq!(self.offset, 0);
127+
T::eq(
128+
&self.values,
129+
self.offset,
130+
&other.values,
131+
other.values.len() - 1,
132+
)
133+
}
134+
}
135+
106136
impl<T: CursorValues> Eq for Cursor<T> {}
107137

108138
impl<T: CursorValues> PartialOrd for Cursor<T> {
@@ -156,6 +186,11 @@ impl CursorValues for RowValues {
156186
l.rows.row(l_idx) == r.rows.row(r_idx)
157187
}
158188

189+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
190+
assert!(idx > 0);
191+
cursor.rows.row(idx) == cursor.rows.row(idx - 1)
192+
}
193+
159194
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
160195
l.rows.row(l_idx).cmp(&r.rows.row(r_idx))
161196
}
@@ -188,6 +223,11 @@ impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
188223
l.0[l_idx].is_eq(r.0[r_idx])
189224
}
190225

226+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
227+
assert!(idx > 0);
228+
cursor.0[idx].is_eq(cursor.0[idx - 1])
229+
}
230+
191231
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
192232
l.0[l_idx].compare(r.0[r_idx])
193233
}
@@ -219,6 +259,11 @@ impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
219259
l.value(l_idx) == r.value(r_idx)
220260
}
221261

262+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
263+
assert!(idx > 0);
264+
cursor.value(idx) == cursor.value(idx - 1)
265+
}
266+
222267
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
223268
l.value(l_idx).cmp(r.value(r_idx))
224269
}
@@ -284,6 +329,15 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
284329
}
285330
}
286331

332+
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
333+
assert!(idx > 0);
334+
match (cursor.is_null(idx), cursor.is_null(idx - 1)) {
335+
(true, true) => true,
336+
(false, false) => T::eq(&cursor.values, idx, &cursor.values, idx - 1),
337+
_ => false,
338+
}
339+
}
340+
287341
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
288342
match (l.is_null(l_idx), r.is_null(r_idx)) {
289343
(true, true) => Ordering::Equal,

0 commit comments

Comments
 (0)