Skip to content

Commit 1dc5153

Browse files
committed
update
1 parent a13e083 commit 1dc5153

File tree

4 files changed

+45
-44
lines changed

4 files changed

+45
-44
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
//! Helper functions for the table implementation
1919
20-
use std::mem;
2120
use std::sync::Arc;
2221

2322
use datafusion_catalog::Session;
@@ -40,7 +39,6 @@ use log::{debug, trace};
4039

4140
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4241
use datafusion_common::{Column, DFSchema, DataFusionError};
43-
use datafusion_datasource::file_groups::FileGroup;
4442
use datafusion_expr::{Expr, Volatility};
4543
use datafusion_physical_expr::create_physical_expr;
4644
use object_store::path::Path;
@@ -122,39 +120,6 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
122120
/// The maximum number of concurrent listing requests
123121
const CONCURRENCY_LIMIT: usize = 100;
124122

125-
/// Partition the list of files into `n` groups
126-
pub fn split_files(mut file_group: FileGroup, n: usize) -> Vec<FileGroup> {
127-
if file_group.is_empty() {
128-
return vec![];
129-
}
130-
131-
// ObjectStore::list does not guarantee any consistent order and for some
132-
// implementations such as LocalFileSystem, it may be inconsistent. Thus
133-
// Sort files by path to ensure consistent plans when run more than once.
134-
file_group.files.sort_by(|a, b| a.path().cmp(b.path()));
135-
136-
// effectively this is div with rounding up instead of truncating
137-
let chunk_size = file_group.len().div_ceil(n);
138-
let mut chunks = Vec::with_capacity(n);
139-
let mut current_chunk = Vec::with_capacity(chunk_size);
140-
for file in file_group.files.drain(..) {
141-
current_chunk.push(file);
142-
if current_chunk.len() == chunk_size {
143-
let full_chunk = FileGroup::new(mem::replace(
144-
&mut current_chunk,
145-
Vec::with_capacity(chunk_size),
146-
));
147-
chunks.push(full_chunk);
148-
}
149-
}
150-
151-
if !current_chunk.is_empty() {
152-
chunks.push(FileGroup::new(current_chunk))
153-
}
154-
155-
chunks
156-
}
157-
158123
pub struct Partition {
159124
/// The path to the partition, including the table prefix
160125
path: Path,
@@ -541,6 +506,7 @@ mod tests {
541506
use object_store::memory::InMemory;
542507
use std::any::Any;
543508
use std::ops::Not;
509+
use datafusion_datasource::file_groups::FileGroup;
544510
// use futures::StreamExt;
545511

546512
use super::*;
@@ -561,32 +527,33 @@ mod tests {
561527
new_partitioned_file("e"),
562528
]);
563529

564-
let chunks = split_files(files.clone(), 1);
530+
let chunks = files.clone().split_files(1);
565531
assert_eq!(1, chunks.len());
566532
assert_eq!(5, chunks[0].len());
567533

568-
let chunks = split_files(files.clone(), 2);
534+
let chunks = files.clone().split_files(2);
569535
assert_eq!(2, chunks.len());
570536
assert_eq!(3, chunks[0].len());
571537
assert_eq!(2, chunks[1].len());
572538

573-
let chunks = split_files(files.clone(), 5);
539+
let chunks = files.clone().split_files(5);
574540
assert_eq!(5, chunks.len());
575541
assert_eq!(1, chunks[0].len());
576542
assert_eq!(1, chunks[1].len());
577543
assert_eq!(1, chunks[2].len());
578544
assert_eq!(1, chunks[3].len());
579545
assert_eq!(1, chunks[4].len());
580546

581-
let chunks = split_files(files, 123);
547+
let chunks = files.clone().split_files(123);
582548
assert_eq!(5, chunks.len());
583549
assert_eq!(1, chunks[0].len());
584550
assert_eq!(1, chunks[1].len());
585551
assert_eq!(1, chunks[2].len());
586552
assert_eq!(1, chunks[3].len());
587553
assert_eq!(1, chunks[4].len());
588554

589-
let chunks = split_files(FileGroup::default(), 2);
555+
let mut empty_group = FileGroup::default();
556+
let chunks = empty_group.split_files(2);
590557
assert_eq!(0, chunks.len());
591558
}
592559

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::collections::HashMap;
2121
use std::{any::Any, str::FromStr, sync::Arc};
2222

23-
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
23+
use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
2424
use super::{ListingTableUrl, PartitionedFile};
2525

2626
use crate::datasource::{
@@ -1128,7 +1128,7 @@ impl ListingTable {
11281128
.boxed()
11291129
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11301130

1131-
let (files, statistics) = get_statistics_with_limit(
1131+
let (mut files, statistics) = get_statistics_with_limit(
11321132
files,
11331133
self.schema(),
11341134
limit,
@@ -1137,7 +1137,7 @@ impl ListingTable {
11371137
.await?;
11381138

11391139
Ok((
1140-
split_files(files, self.options.target_partitions),
1140+
files.split_files(self.options.target_partitions),
11411141
statistics,
11421142
))
11431143
}

datafusion/datasource/src/file_groups.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use itertools::Itertools;
2323
use std::cmp::min;
2424
use std::collections::BinaryHeap;
2525
use std::iter::repeat_with;
26+
use std::mem;
2627
use std::ops::{Index, IndexMut};
2728

2829
/// Repartition input files into `target_partitions` partitions, if total file size exceed
@@ -407,6 +408,39 @@ impl FileGroup {
407408
pub fn push(&mut self, file: PartitionedFile) {
408409
self.files.push(file);
409410
}
411+
412+
/// Partition the list of files into `n` groups
413+
pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> {
414+
if self.is_empty() {
415+
return vec![];
416+
}
417+
418+
// ObjectStore::list does not guarantee any consistent order and for some
419+
// implementations such as LocalFileSystem, it may be inconsistent. Thus
420+
// Sort files by path to ensure consistent plans when run more than once.
421+
self.files.sort_by(|a, b| a.path().cmp(b.path()));
422+
423+
// effectively this is div with rounding up instead of truncating
424+
let chunk_size = self.len().div_ceil(n);
425+
let mut chunks = Vec::with_capacity(n);
426+
let mut current_chunk = Vec::with_capacity(chunk_size);
427+
for file in self.files.drain(..) {
428+
current_chunk.push(file);
429+
if current_chunk.len() == chunk_size {
430+
let full_chunk = FileGroup::new(mem::replace(
431+
&mut current_chunk,
432+
Vec::with_capacity(chunk_size),
433+
));
434+
chunks.push(full_chunk);
435+
}
436+
}
437+
438+
if !current_chunk.is_empty() {
439+
chunks.push(FileGroup::new(current_chunk))
440+
}
441+
442+
chunks
443+
}
410444
}
411445

412446
impl Index<usize> for FileGroup {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use crate::{
7272
/// # use object_store::ObjectStore;
7373
/// # use datafusion_common::Statistics;
7474
/// # use datafusion_datasource::file::FileSource;
75-
/// use datafusion_datasource::file_groups::FileGroup;
75+
/// # use datafusion_datasource::file_groups::FileGroup;
7676
/// # use datafusion_datasource::PartitionedFile;
7777
/// # use datafusion_datasource::file_scan_config::FileScanConfig;
7878
/// # use datafusion_datasource::file_stream::FileOpener;

0 commit comments

Comments
 (0)