Skip to content

Commit bddb641

Browse files
authored
Reduce clone of Statistics in ListingTable and PartitionedFile (#11802)
* reduce clone of `Statistics` by using arc. * optimize `get_statistics_with_limit` and `split_files`. * directly create the col stats set. * fix pb. * fix fmt. * fix clippy. * fix compile. * remove stale codes. * optimize `split_files` by using drain. * remove default for PartitionedFile. * don't keep `Arc<Statistic>` in `PartitionedFile`. * fix pb.
1 parent 16a3557 commit bddb641

File tree

4 files changed

+107
-97
lines changed

4 files changed

+107
-97
lines changed

datafusion/core/src/datasource/listing/helpers.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Helper functions for the table implementation
1919
2020
use std::collections::HashMap;
21+
use std::mem;
2122
use std::sync::Arc;
2223

2324
use super::PartitionedFile;
@@ -138,10 +139,22 @@ pub fn split_files(
138139

139140
// effectively this is div with rounding up instead of truncating
140141
let chunk_size = (partitioned_files.len() + n - 1) / n;
141-
partitioned_files
142-
.chunks(chunk_size)
143-
.map(|c| c.to_vec())
144-
.collect()
142+
let mut chunks = Vec::with_capacity(n);
143+
let mut current_chunk = Vec::with_capacity(chunk_size);
144+
for file in partitioned_files.drain(..) {
145+
current_chunk.push(file);
146+
if current_chunk.len() == chunk_size {
147+
let full_chunk =
148+
mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
149+
chunks.push(full_chunk);
150+
}
151+
}
152+
153+
if !current_chunk.is_empty() {
154+
chunks.push(current_chunk)
155+
}
156+
157+
chunks
145158
}
146159

147160
struct Partition {

datafusion/core/src/datasource/listing/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub struct PartitionedFile {
8282
/// An optional field for user defined per object metadata
8383
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
8484
}
85+
8586
impl PartitionedFile {
8687
/// Create a simple file without metadata or partition
8788
pub fn new(path: impl Into<String>, size: u64) -> Self {

datafusion/core/src/datasource/listing/table.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -973,15 +973,16 @@ impl ListingTable {
973973
// collect the statistics if required by the config
974974
let files = file_list
975975
.map(|part_file| async {
976-
let mut part_file = part_file?;
976+
let part_file = part_file?;
977977
if self.options.collect_stat {
978978
let statistics =
979979
self.do_collect_statistics(ctx, &store, &part_file).await?;
980-
part_file.statistics = Some(statistics.clone());
981-
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
980+
Ok((part_file, statistics))
982981
} else {
983-
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
984-
as Result<(PartitionedFile, Statistics)>
982+
Ok((
983+
part_file,
984+
Arc::new(Statistics::new_unknown(&self.file_schema)),
985+
))
985986
}
986987
})
987988
.boxed()
@@ -1011,12 +1012,12 @@ impl ListingTable {
10111012
ctx: &SessionState,
10121013
store: &Arc<dyn ObjectStore>,
10131014
part_file: &PartitionedFile,
1014-
) -> Result<Statistics> {
1015-
let statistics_cache = self.collected_statistics.clone();
1016-
return match statistics_cache
1015+
) -> Result<Arc<Statistics>> {
1016+
match self
1017+
.collected_statistics
10171018
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
10181019
{
1019-
Some(statistics) => Ok(statistics.as_ref().clone()),
1020+
Some(statistics) => Ok(statistics.clone()),
10201021
None => {
10211022
let statistics = self
10221023
.options
@@ -1028,14 +1029,15 @@ impl ListingTable {
10281029
&part_file.object_meta,
10291030
)
10301031
.await?;
1031-
statistics_cache.put_with_extra(
1032+
let statistics = Arc::new(statistics);
1033+
self.collected_statistics.put_with_extra(
10321034
&part_file.object_meta.location,
1033-
statistics.clone().into(),
1035+
statistics.clone(),
10341036
&part_file.object_meta,
10351037
);
10361038
Ok(statistics)
10371039
}
1038-
};
1040+
}
10391041
}
10401042
}
10411043

datafusion/core/src/datasource/statistics.rs

Lines changed: 75 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::mem;
19+
use std::sync::Arc;
20+
1821
use super::listing::PartitionedFile;
1922
use crate::arrow::datatypes::{Schema, SchemaRef};
2023
use crate::error::Result;
@@ -26,16 +29,14 @@ use datafusion_common::stats::Precision;
2629
use datafusion_common::ScalarValue;
2730

2831
use futures::{Stream, StreamExt};
29-
use itertools::izip;
30-
use itertools::multiunzip;
3132

3233
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
3334
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
3435
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
3536
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
3637
/// call to `multiunzip` for constructing file level summary statistics.
3738
pub async fn get_statistics_with_limit(
38-
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
39+
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
3940
file_schema: SchemaRef,
4041
limit: Option<usize>,
4142
collect_stats: bool,
@@ -48,26 +49,27 @@ pub async fn get_statistics_with_limit(
4849
// - zero for summations, and
4950
// - neutral element for extreme points.
5051
let size = file_schema.fields().len();
51-
let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
52-
let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
53-
let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
52+
let mut col_stats_set = vec![ColumnStatistics::default(); size];
5453
let mut num_rows = Precision::<usize>::Absent;
5554
let mut total_byte_size = Precision::<usize>::Absent;
5655

5756
// Fusing the stream allows us to call next safely even once it is finished.
5857
let mut all_files = Box::pin(all_files.fuse());
5958

6059
if let Some(first_file) = all_files.next().await {
61-
let (file, file_stats) = first_file?;
60+
let (mut file, file_stats) = first_file?;
61+
file.statistics = Some(file_stats.as_ref().clone());
6262
result_files.push(file);
6363

6464
// First file, we set them directly from the file statistics.
65-
num_rows = file_stats.num_rows;
66-
total_byte_size = file_stats.total_byte_size;
67-
for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
68-
null_counts[index] = file_column.null_count;
69-
max_values[index] = file_column.max_value;
70-
min_values[index] = file_column.min_value;
65+
num_rows = file_stats.num_rows.clone();
66+
total_byte_size = file_stats.total_byte_size.clone();
67+
for (index, file_column) in
68+
file_stats.column_statistics.clone().into_iter().enumerate()
69+
{
70+
col_stats_set[index].null_count = file_column.null_count;
71+
col_stats_set[index].max_value = file_column.max_value;
72+
col_stats_set[index].min_value = file_column.min_value;
7173
}
7274

7375
// If the number of rows exceeds the limit, we can stop processing
@@ -80,7 +82,8 @@ pub async fn get_statistics_with_limit(
8082
};
8183
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
8284
while let Some(current) = all_files.next().await {
83-
let (file, file_stats) = current?;
85+
let (mut file, file_stats) = current?;
86+
file.statistics = Some(file_stats.as_ref().clone());
8487
result_files.push(file);
8588
if !collect_stats {
8689
continue;
@@ -90,38 +93,28 @@ pub async fn get_statistics_with_limit(
9093
// counts across all the files in question. If any file does not
9194
// provide any information or provides an inexact value, we demote
9295
// the statistic precision to inexact.
93-
num_rows = add_row_stats(file_stats.num_rows, num_rows);
96+
num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);
9497

9598
total_byte_size =
96-
add_row_stats(file_stats.total_byte_size, total_byte_size);
99+
add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);
97100

98-
(null_counts, max_values, min_values) = multiunzip(
99-
izip!(
100-
file_stats.column_statistics.into_iter(),
101-
null_counts.into_iter(),
102-
max_values.into_iter(),
103-
min_values.into_iter()
104-
)
105-
.map(
106-
|(
107-
ColumnStatistics {
108-
null_count: file_nc,
109-
max_value: file_max,
110-
min_value: file_min,
111-
distinct_count: _,
112-
},
113-
null_count,
114-
max_value,
115-
min_value,
116-
)| {
117-
(
118-
add_row_stats(file_nc, null_count),
119-
set_max_if_greater(file_max, max_value),
120-
set_min_if_lesser(file_min, min_value),
121-
)
122-
},
123-
),
124-
);
101+
for (file_col_stats, col_stats) in file_stats
102+
.column_statistics
103+
.iter()
104+
.zip(col_stats_set.iter_mut())
105+
{
106+
let ColumnStatistics {
107+
null_count: file_nc,
108+
max_value: file_max,
109+
min_value: file_min,
110+
distinct_count: _,
111+
} = file_col_stats;
112+
113+
col_stats.null_count =
114+
add_row_stats(file_nc.clone(), col_stats.null_count.clone());
115+
set_max_if_greater(file_max, &mut col_stats.max_value);
116+
set_min_if_lesser(file_min, &mut col_stats.min_value)
117+
}
125118

126119
// If the number of rows exceeds the limit, we can stop processing
127120
// files. This only applies when we know the number of rows. It also
@@ -139,7 +132,7 @@ pub async fn get_statistics_with_limit(
139132
let mut statistics = Statistics {
140133
num_rows,
141134
total_byte_size,
142-
column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
135+
column_statistics: col_stats_set,
143136
};
144137
if all_files.next().await.is_some() {
145138
// If we still have files in the stream, it means that the limit kicked
@@ -182,21 +175,6 @@ fn add_row_stats(
182175
}
183176
}
184177

185-
pub(crate) fn get_col_stats_vec(
186-
null_counts: Vec<Precision<usize>>,
187-
max_values: Vec<Precision<ScalarValue>>,
188-
min_values: Vec<Precision<ScalarValue>>,
189-
) -> Vec<ColumnStatistics> {
190-
izip!(null_counts, max_values, min_values)
191-
.map(|(null_count, max_value, min_value)| ColumnStatistics {
192-
null_count,
193-
max_value,
194-
min_value,
195-
distinct_count: Precision::Absent,
196-
})
197-
.collect()
198-
}
199-
200178
pub(crate) fn get_col_stats(
201179
schema: &Schema,
202180
null_counts: Vec<Precision<usize>>,
@@ -238,45 +216,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
238216
/// If the given value is numerically greater than the original maximum value,
239217
/// return the new maximum value with appropriate exactness information.
240218
fn set_max_if_greater(
241-
max_nominee: Precision<ScalarValue>,
242-
max_values: Precision<ScalarValue>,
243-
) -> Precision<ScalarValue> {
244-
match (&max_values, &max_nominee) {
245-
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
219+
max_nominee: &Precision<ScalarValue>,
220+
max_value: &mut Precision<ScalarValue>,
221+
) {
222+
match (&max_value, max_nominee) {
223+
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
224+
*max_value = max_nominee.clone();
225+
}
246226
(Precision::Exact(val1), Precision::Inexact(val2))
247227
| (Precision::Inexact(val1), Precision::Inexact(val2))
248228
| (Precision::Inexact(val1), Precision::Exact(val2))
249229
if val1 < val2 =>
250230
{
251-
max_nominee.to_inexact()
231+
*max_value = max_nominee.clone().to_inexact();
232+
}
233+
(Precision::Exact(_), Precision::Absent) => {
234+
let exact_max = mem::take(max_value);
235+
*max_value = exact_max.to_inexact();
236+
}
237+
(Precision::Absent, Precision::Exact(_)) => {
238+
*max_value = max_nominee.clone().to_inexact();
239+
}
240+
(Precision::Absent, Precision::Inexact(_)) => {
241+
*max_value = max_nominee.clone();
252242
}
253-
(Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
254-
(Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
255-
(Precision::Absent, Precision::Inexact(_)) => max_nominee,
256-
(Precision::Absent, Precision::Absent) => Precision::Absent,
257-
_ => max_values,
243+
_ => {}
258244
}
259245
}
260246

261247
/// If the given value is numerically lesser than the original minimum value,
262248
/// return the new minimum value with appropriate exactness information.
263249
fn set_min_if_lesser(
264-
min_nominee: Precision<ScalarValue>,
265-
min_values: Precision<ScalarValue>,
266-
) -> Precision<ScalarValue> {
267-
match (&min_values, &min_nominee) {
268-
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
250+
min_nominee: &Precision<ScalarValue>,
251+
min_value: &mut Precision<ScalarValue>,
252+
) {
253+
match (&min_value, min_nominee) {
254+
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
255+
*min_value = min_nominee.clone();
256+
}
269257
(Precision::Exact(val1), Precision::Inexact(val2))
270258
| (Precision::Inexact(val1), Precision::Inexact(val2))
271259
| (Precision::Inexact(val1), Precision::Exact(val2))
272260
if val1 > val2 =>
273261
{
274-
min_nominee.to_inexact()
262+
*min_value = min_nominee.clone().to_inexact();
263+
}
264+
(Precision::Exact(_), Precision::Absent) => {
265+
let exact_min = mem::take(min_value);
266+
*min_value = exact_min.to_inexact();
267+
}
268+
(Precision::Absent, Precision::Exact(_)) => {
269+
*min_value = min_nominee.clone().to_inexact();
270+
}
271+
(Precision::Absent, Precision::Inexact(_)) => {
272+
*min_value = min_nominee.clone();
275273
}
276-
(Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
277-
(Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
278-
(Precision::Absent, Precision::Inexact(_)) => min_nominee,
279-
(Precision::Absent, Precision::Absent) => Precision::Absent,
280-
_ => min_values,
274+
_ => {}
281275
}
282276
}

0 commit comments

Comments
 (0)