Skip to content

Commit e08ef97

Browse files
authored
Introduce load-balanced split_groups_by_statistics method (#15473)
* Improve split_groups_by_statistics method * add benchmark_group name * use v2 * add comments and fix tests * address partial * refine doc * rename * add tests * resolve conflict * move notes
1 parent 5a335b8 commit e08ef97

File tree

7 files changed

+498
-12
lines changed

7 files changed

+498
-12
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,10 +897,11 @@ impl TableProvider for ListingTable {
897897
.split_file_groups_by_statistics
898898
.then(|| {
899899
output_ordering.first().map(|output_ordering| {
900-
FileScanConfig::split_groups_by_statistics(
900+
FileScanConfig::split_groups_by_statistics_with_target_partitions(
901901
&self.table_schema,
902902
&partitioned_file_lists,
903903
output_ordering,
904+
self.options.target_partitions,
904905
)
905906
})
906907
})

datafusion/datasource/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ xz2 = { version = "0.1", optional = true, features = ["static"] }
7272
zstd = { version = "0.13", optional = true, default-features = false }
7373

7474
[dev-dependencies]
75+
criterion = { workspace = true }
7576
tempfile = { workspace = true }
7677

7778
[lints]
@@ -80,3 +81,7 @@ workspace = true
8081
[lib]
8182
name = "datafusion_datasource"
8283
path = "src/mod.rs"
84+
85+
[[bench]]
86+
name = "split_groups_by_statistics"
87+
harness = false
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::datatypes::{DataType, Field, Schema};
19+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
20+
use datafusion_datasource::file_scan_config::FileScanConfig;
21+
use datafusion_datasource::{generate_test_files, verify_sort_integrity};
22+
use datafusion_physical_expr::PhysicalSortExpr;
23+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
24+
use std::sync::Arc;
25+
use std::time::Duration;
26+
27+
pub fn compare_split_groups_by_statistics_algorithms(c: &mut Criterion) {
28+
let file_schema = Arc::new(Schema::new(vec![Field::new(
29+
"value",
30+
DataType::Float64,
31+
false,
32+
)]));
33+
34+
let sort_expr = PhysicalSortExpr {
35+
expr: Arc::new(datafusion_physical_expr::expressions::Column::new(
36+
"value", 0,
37+
)),
38+
options: arrow::compute::SortOptions::default(),
39+
};
40+
let sort_ordering = LexOrdering::from(vec![sort_expr]);
41+
42+
// Small, medium, large number of files
43+
let file_counts = [10, 100, 1000];
44+
let overlap_factors = [0.0, 0.2, 0.5, 0.8]; // No, low, medium, high overlap
45+
46+
let target_partitions: [usize; 4] = [4, 8, 16, 32];
47+
48+
let mut group = c.benchmark_group("split_groups");
49+
group.measurement_time(Duration::from_secs(10));
50+
51+
for &num_files in &file_counts {
52+
for &overlap in &overlap_factors {
53+
let file_groups = generate_test_files(num_files, overlap);
54+
// Benchmark original algorithm
55+
group.bench_with_input(
56+
BenchmarkId::new(
57+
"original",
58+
format!("files={},overlap={:.1}", num_files, overlap),
59+
),
60+
&(
61+
file_groups.clone(),
62+
file_schema.clone(),
63+
sort_ordering.clone(),
64+
),
65+
|b, (fg, schema, order)| {
66+
let mut result = Vec::new();
67+
b.iter(|| {
68+
result =
69+
FileScanConfig::split_groups_by_statistics(schema, fg, order)
70+
.unwrap();
71+
});
72+
assert!(verify_sort_integrity(&result));
73+
},
74+
);
75+
76+
// Benchmark new algorithm with different target partitions
77+
for &tp in &target_partitions {
78+
group.bench_with_input(
79+
BenchmarkId::new(
80+
format!("v2_partitions={}", tp),
81+
format!("files={},overlap={:.1}", num_files, overlap),
82+
),
83+
&(
84+
file_groups.clone(),
85+
file_schema.clone(),
86+
sort_ordering.clone(),
87+
tp,
88+
),
89+
|b, (fg, schema, order, target)| {
90+
let mut result = Vec::new();
91+
b.iter(|| {
92+
result = FileScanConfig::split_groups_by_statistics_with_target_partitions(
93+
schema, fg, order, *target,
94+
)
95+
.unwrap();
96+
});
97+
assert!(verify_sort_integrity(&result));
98+
},
99+
);
100+
}
101+
}
102+
}
103+
104+
group.finish();
105+
}
106+
107+
criterion_group!(benches, compare_split_groups_by_statistics_algorithms);
108+
criterion_main!(benches);

0 commit comments

Comments
 (0)