Skip to content

Commit 7df36ed

Browse files
committed
update
1 parent a13e083 commit 7df36ed

File tree

4 files changed

+45
-45
lines changed

4 files changed

+45
-45
lines changed

datafusion/catalog-listing/src/helpers.rs

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
//! Helper functions for the table implementation
1919
20-
use std::mem;
2120
use std::sync::Arc;
2221

2322
use datafusion_catalog::Session;
@@ -40,7 +39,6 @@ use log::{debug, trace};
4039

4140
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4241
use datafusion_common::{Column, DFSchema, DataFusionError};
43-
use datafusion_datasource::file_groups::FileGroup;
4442
use datafusion_expr::{Expr, Volatility};
4543
use datafusion_physical_expr::create_physical_expr;
4644
use object_store::path::Path;
@@ -122,39 +120,6 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
122120
/// The maximum number of concurrent listing requests
123121
const CONCURRENCY_LIMIT: usize = 100;
124122

125-
/// Partition the list of files into `n` groups
126-
pub fn split_files(mut file_group: FileGroup, n: usize) -> Vec<FileGroup> {
127-
if file_group.is_empty() {
128-
return vec![];
129-
}
130-
131-
// ObjectStore::list does not guarantee any consistent order and for some
132-
// implementations such as LocalFileSystem, it may be inconsistent. Thus
133-
// Sort files by path to ensure consistent plans when run more than once.
134-
file_group.files.sort_by(|a, b| a.path().cmp(b.path()));
135-
136-
// effectively this is div with rounding up instead of truncating
137-
let chunk_size = file_group.len().div_ceil(n);
138-
let mut chunks = Vec::with_capacity(n);
139-
let mut current_chunk = Vec::with_capacity(chunk_size);
140-
for file in file_group.files.drain(..) {
141-
current_chunk.push(file);
142-
if current_chunk.len() == chunk_size {
143-
let full_chunk = FileGroup::new(mem::replace(
144-
&mut current_chunk,
145-
Vec::with_capacity(chunk_size),
146-
));
147-
chunks.push(full_chunk);
148-
}
149-
}
150-
151-
if !current_chunk.is_empty() {
152-
chunks.push(FileGroup::new(current_chunk))
153-
}
154-
155-
chunks
156-
}
157-
158123
pub struct Partition {
159124
/// The path to the partition, including the table prefix
160125
path: Path,
@@ -535,13 +500,13 @@ pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
535500
mod tests {
536501
use async_trait::async_trait;
537502
use datafusion_common::config::TableOptions;
503+
use datafusion_datasource::file_groups::FileGroup;
538504
use datafusion_execution::config::SessionConfig;
539505
use datafusion_execution::runtime_env::RuntimeEnv;
540506
use futures::FutureExt;
541507
use object_store::memory::InMemory;
542508
use std::any::Any;
543509
use std::ops::Not;
544-
// use futures::StreamExt;
545510

546511
use super::*;
547512
use datafusion_expr::{
@@ -561,32 +526,33 @@ mod tests {
561526
new_partitioned_file("e"),
562527
]);
563528

564-
let chunks = split_files(files.clone(), 1);
529+
let chunks = files.clone().split_files(1);
565530
assert_eq!(1, chunks.len());
566531
assert_eq!(5, chunks[0].len());
567532

568-
let chunks = split_files(files.clone(), 2);
533+
let chunks = files.clone().split_files(2);
569534
assert_eq!(2, chunks.len());
570535
assert_eq!(3, chunks[0].len());
571536
assert_eq!(2, chunks[1].len());
572537

573-
let chunks = split_files(files.clone(), 5);
538+
let chunks = files.clone().split_files(5);
574539
assert_eq!(5, chunks.len());
575540
assert_eq!(1, chunks[0].len());
576541
assert_eq!(1, chunks[1].len());
577542
assert_eq!(1, chunks[2].len());
578543
assert_eq!(1, chunks[3].len());
579544
assert_eq!(1, chunks[4].len());
580545

581-
let chunks = split_files(files, 123);
546+
let chunks = files.clone().split_files(123);
582547
assert_eq!(5, chunks.len());
583548
assert_eq!(1, chunks[0].len());
584549
assert_eq!(1, chunks[1].len());
585550
assert_eq!(1, chunks[2].len());
586551
assert_eq!(1, chunks[3].len());
587552
assert_eq!(1, chunks[4].len());
588553

589-
let chunks = split_files(FileGroup::default(), 2);
554+
let mut empty_group = FileGroup::default();
555+
let chunks = empty_group.split_files(2);
590556
assert_eq!(0, chunks.len());
591557
}
592558

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::collections::HashMap;
2121
use std::{any::Any, str::FromStr, sync::Arc};
2222

23-
use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files};
23+
use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
2424
use super::{ListingTableUrl, PartitionedFile};
2525

2626
use crate::datasource::{
@@ -1128,7 +1128,7 @@ impl ListingTable {
11281128
.boxed()
11291129
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
11301130

1131-
let (files, statistics) = get_statistics_with_limit(
1131+
let (mut files, statistics) = get_statistics_with_limit(
11321132
files,
11331133
self.schema(),
11341134
limit,
@@ -1137,7 +1137,7 @@ impl ListingTable {
11371137
.await?;
11381138

11391139
Ok((
1140-
split_files(files, self.options.target_partitions),
1140+
files.split_files(self.options.target_partitions),
11411141
statistics,
11421142
))
11431143
}

datafusion/datasource/src/file_groups.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use itertools::Itertools;
2323
use std::cmp::min;
2424
use std::collections::BinaryHeap;
2525
use std::iter::repeat_with;
26+
use std::mem;
2627
use std::ops::{Index, IndexMut};
2728

2829
/// Repartition input files into `target_partitions` partitions, if total file size exceed
@@ -407,6 +408,39 @@ impl FileGroup {
407408
pub fn push(&mut self, file: PartitionedFile) {
408409
self.files.push(file);
409410
}
411+
412+
/// Partition the list of files into `n` groups
413+
pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> {
414+
if self.is_empty() {
415+
return vec![];
416+
}
417+
418+
// ObjectStore::list does not guarantee any consistent order and for some
419+
// implementations such as LocalFileSystem, it may be inconsistent. Thus
420+
// Sort files by path to ensure consistent plans when run more than once.
421+
self.files.sort_by(|a, b| a.path().cmp(b.path()));
422+
423+
// effectively this is div with rounding up instead of truncating
424+
let chunk_size = self.len().div_ceil(n);
425+
let mut chunks = Vec::with_capacity(n);
426+
let mut current_chunk = Vec::with_capacity(chunk_size);
427+
for file in self.files.drain(..) {
428+
current_chunk.push(file);
429+
if current_chunk.len() == chunk_size {
430+
let full_chunk = FileGroup::new(mem::replace(
431+
&mut current_chunk,
432+
Vec::with_capacity(chunk_size),
433+
));
434+
chunks.push(full_chunk);
435+
}
436+
}
437+
438+
if !current_chunk.is_empty() {
439+
chunks.push(FileGroup::new(current_chunk))
440+
}
441+
442+
chunks
443+
}
410444
}
411445

412446
impl Index<usize> for FileGroup {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use crate::{
7272
/// # use object_store::ObjectStore;
7373
/// # use datafusion_common::Statistics;
7474
/// # use datafusion_datasource::file::FileSource;
75-
/// use datafusion_datasource::file_groups::FileGroup;
75+
/// # use datafusion_datasource::file_groups::FileGroup;
7676
/// # use datafusion_datasource::PartitionedFile;
7777
/// # use datafusion_datasource::file_scan_config::FileScanConfig;
7878
/// # use datafusion_datasource::file_stream::FileOpener;

0 commit comments

Comments
 (0)