Skip to content

Commit a13e083

Browse files
committed
Refactor: add FileGroup structure
1 parent 5210a2b commit a13e083

File tree

32 files changed

+374
-244
lines changed

32 files changed

+374
-244
lines changed

datafusion-examples/examples/parquet_exec_visitor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use datafusion::datasource::file_format::parquet::ParquetFormat;
21-
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
22-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
21+
use datafusion::datasource::listing::ListingOptions;
22+
use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, ParquetSource};
2323
use datafusion::datasource::source::DataSourceExec;
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionContext;
@@ -85,7 +85,7 @@ async fn main() {
8585
/// and `file_groups` from the FileScanConfig.
8686
#[derive(Debug)]
8787
struct ParquetExecVisitor {
88-
file_groups: Option<Vec<Vec<PartitionedFile>>>,
88+
file_groups: Option<Vec<FileGroup>>,
8989
bytes_scanned: Option<MetricValue>,
9090
}
9191

datafusion/catalog-listing/src/helpers.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use log::{debug, trace};
4040

4141
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4242
use datafusion_common::{Column, DFSchema, DataFusionError};
43+
use datafusion_datasource::file_groups::FileGroup;
4344
use datafusion_expr::{Expr, Volatility};
4445
use datafusion_physical_expr::create_physical_expr;
4546
use object_store::path::Path;
@@ -122,34 +123,33 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
122123
const CONCURRENCY_LIMIT: usize = 100;
123124

124125
/// Partition the list of files into `n` groups
125-
pub fn split_files(
126-
mut partitioned_files: Vec<PartitionedFile>,
127-
n: usize,
128-
) -> Vec<Vec<PartitionedFile>> {
129-
if partitioned_files.is_empty() {
126+
pub fn split_files(mut file_group: FileGroup, n: usize) -> Vec<FileGroup> {
127+
if file_group.is_empty() {
130128
return vec![];
131129
}
132130

133131
// ObjectStore::list does not guarantee any consistent order and for some
134132
// implementations such as LocalFileSystem, it may be inconsistent. Thus
135133
// Sort files by path to ensure consistent plans when run more than once.
136-
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
134+
file_group.files.sort_by(|a, b| a.path().cmp(b.path()));
137135

138136
// effectively this is div with rounding up instead of truncating
139-
let chunk_size = partitioned_files.len().div_ceil(n);
137+
let chunk_size = file_group.len().div_ceil(n);
140138
let mut chunks = Vec::with_capacity(n);
141139
let mut current_chunk = Vec::with_capacity(chunk_size);
142-
for file in partitioned_files.drain(..) {
140+
for file in file_group.files.drain(..) {
143141
current_chunk.push(file);
144142
if current_chunk.len() == chunk_size {
145-
let full_chunk =
146-
mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
143+
let full_chunk = FileGroup::new(mem::replace(
144+
&mut current_chunk,
145+
Vec::with_capacity(chunk_size),
146+
));
147147
chunks.push(full_chunk);
148148
}
149149
}
150150

151151
if !current_chunk.is_empty() {
152-
chunks.push(current_chunk)
152+
chunks.push(FileGroup::new(current_chunk))
153153
}
154154

155155
chunks
@@ -553,13 +553,13 @@ mod tests {
553553
#[test]
554554
fn test_split_files() {
555555
let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
556-
let files = vec![
556+
let files = FileGroup::new(vec![
557557
new_partitioned_file("a"),
558558
new_partitioned_file("b"),
559559
new_partitioned_file("c"),
560560
new_partitioned_file("d"),
561561
new_partitioned_file("e"),
562-
];
562+
]);
563563

564564
let chunks = split_files(files.clone(), 1);
565565
assert_eq!(1, chunks.len());
@@ -586,7 +586,7 @@ mod tests {
586586
assert_eq!(1, chunks[3].len());
587587
assert_eq!(1, chunks[4].len());
588588

589-
let chunks = split_files(vec![], 2);
589+
let chunks = split_files(FileGroup::default(), 2);
590590
assert_eq!(0, chunks.len());
591591
}
592592

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ impl DisplayAs for ArrowFileSink {
297297
match t {
298298
DisplayFormatType::Default | DisplayFormatType::Verbose => {
299299
write!(f, "ArrowFileSink(file_groups=",)?;
300-
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
300+
FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
301301
write!(f, ")")
302302
}
303303
DisplayFormatType::TreeRender => {

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub(crate) mod test_util {
4040

4141
use datafusion_catalog::Session;
4242
use datafusion_common::Result;
43+
use datafusion_datasource::file_groups::FileGroup;
4344
use datafusion_datasource::{
4445
file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile,
4546
};
@@ -66,14 +67,14 @@ pub(crate) mod test_util {
6667
.infer_stats(state, &store, file_schema.clone(), &meta)
6768
.await?;
6869

69-
let file_groups = vec![vec![PartitionedFile {
70+
let file_groups = vec![FileGroup::new(vec![PartitionedFile {
7071
object_meta: meta,
7172
partition_values: vec![],
7273
range: None,
7374
statistics: None,
7475
extensions: None,
7576
metadata_size_hint: None,
76-
}]];
77+
}])];
7778

7879
let exec = format
7980
.create_physical_plan(

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ mod tests {
147147
};
148148
use arrow::datatypes::{DataType, Field};
149149
use async_trait::async_trait;
150+
use datafusion_datasource::file_groups::FileGroup;
150151
use futures::stream::BoxStream;
151152
use futures::{Stream, StreamExt};
152153
use log::error;
@@ -1375,7 +1376,7 @@ mod tests {
13751376
let file_sink_config = FileSinkConfig {
13761377
original_url: String::default(),
13771378
object_store_url: object_store_url.clone(),
1378-
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
1379+
file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
13791380
table_paths: vec![ListingTableUrl::parse(table_path)?],
13801381
output_schema: schema.clone(),
13811382
table_partition_cols: vec![],
@@ -1461,7 +1462,7 @@ mod tests {
14611462
let file_sink_config = FileSinkConfig {
14621463
original_url: String::default(),
14631464
object_store_url: object_store_url.clone(),
1464-
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
1465+
file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
14651466
table_paths: vec![ListingTableUrl::parse("file:///")?],
14661467
output_schema: schema.clone(),
14671468
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
@@ -1545,7 +1546,10 @@ mod tests {
15451546
let file_sink_config = FileSinkConfig {
15461547
original_url: String::default(),
15471548
object_store_url: object_store_url.clone(),
1548-
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
1549+
file_group: FileGroup::new(vec![PartitionedFile::new(
1550+
"/tmp".to_string(),
1551+
1,
1552+
)]),
15491553
table_paths: vec![ListingTableUrl::parse("file:///")?],
15501554
output_schema: schema.clone(),
15511555
table_partition_cols: vec![],

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use datafusion_physical_expr::{
5555

5656
use async_trait::async_trait;
5757
use datafusion_catalog::Session;
58+
use datafusion_datasource::file_groups::FileGroup;
5859
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5960
use futures::{future, stream, StreamExt, TryStreamExt};
6061
use itertools::Itertools;
@@ -1031,7 +1032,7 @@ impl TableProvider for ListingTable {
10311032
)
10321033
.await?;
10331034

1034-
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
1035+
let file_group = FileGroup::new(file_list_stream.try_collect::<Vec<_>>().await?);
10351036
let keep_partition_by_columns =
10361037
state.config_options().execution.keep_partition_by_columns;
10371038

@@ -1040,7 +1041,7 @@ impl TableProvider for ListingTable {
10401041
original_url: String::default(),
10411042
object_store_url: self.table_paths()[0].object_store(),
10421043
table_paths: self.table_paths().clone(),
1043-
file_groups,
1044+
file_group,
10441045
output_schema: self.schema(),
10451046
table_partition_cols: self.options.table_partition_cols.clone(),
10461047
insert_op,
@@ -1088,7 +1089,7 @@ impl ListingTable {
10881089
ctx: &'a dyn Session,
10891090
filters: &'a [Expr],
10901091
limit: Option<usize>,
1091-
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
1092+
) -> Result<(Vec<FileGroup>, Statistics)> {
10921093
let store = if let Some(url) = self.table_paths.first() {
10931094
ctx.runtime_env().object_store(url)?
10941095
} else {

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::any::Any;
2121
use std::sync::Arc;
2222

23-
use crate::datasource::listing::PartitionedFile;
2423
use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
2524
use crate::error::Result;
2625

@@ -42,6 +41,7 @@ use datafusion_physical_plan::{
4241
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
4342
};
4443

44+
use datafusion_datasource::file_groups::FileGroup;
4545
use futures::StreamExt;
4646
use itertools::Itertools;
4747
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
@@ -124,7 +124,7 @@ impl ArrowExec {
124124
)
125125
}
126126

127-
fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self {
127+
fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
128128
self.base_config.file_groups = file_groups.clone();
129129
let mut file_source = self.file_scan_config();
130130
file_source = file_source.with_file_groups(file_groups);

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ mod tests {
4040
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
4141
use datafusion_datasource::file_compression_type::FileCompressionType;
4242
use datafusion_datasource::file_format::FileFormat;
43-
use datafusion_datasource::PartitionedFile;
4443
use datafusion_datasource_json::JsonFormat;
4544
use datafusion_execution::config::SessionConfig;
4645
use datafusion_execution::object_store::ObjectStoreUrl;
@@ -49,6 +48,7 @@ mod tests {
4948
use arrow::array::Array;
5049
use arrow::datatypes::SchemaRef;
5150
use arrow::datatypes::{Field, SchemaBuilder};
51+
use datafusion_datasource::file_groups::FileGroup;
5252
use object_store::chunked::ChunkedStore;
5353
use object_store::local::LocalFileSystem;
5454
use object_store::ObjectStore;
@@ -62,7 +62,7 @@ mod tests {
6262
state: &SessionState,
6363
file_compression_type: FileCompressionType,
6464
work_dir: &Path,
65-
) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
65+
) -> (ObjectStoreUrl, Vec<FileGroup>, SchemaRef) {
6666
let store_url = ObjectStoreUrl::local_filesystem();
6767
let store = state.runtime_env().object_store(&store_url).unwrap();
6868

@@ -79,6 +79,7 @@ mod tests {
7979
let meta = file_groups
8080
.first()
8181
.unwrap()
82+
.files
8283
.first()
8384
.unwrap()
8485
.clone()
@@ -113,6 +114,7 @@ mod tests {
113114
let path = file_groups
114115
.first()
115116
.unwrap()
117+
.files
116118
.first()
117119
.unwrap()
118120
.object_meta
@@ -560,6 +562,7 @@ mod tests {
560562
let path = file_groups
561563
.first()
562564
.unwrap()
565+
.files
563566
.first()
564567
.unwrap()
565568
.object_meta

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub use csv::{CsvExec, CsvExecBuilder};
5353

5454
pub use csv::{CsvOpener, CsvSource};
5555
pub use datafusion_datasource::file::FileSource;
56+
pub use datafusion_datasource::file_groups::FileGroup;
5657
pub use datafusion_datasource::file_groups::FileGroupPartitioner;
5758
pub use datafusion_datasource::file_meta::FileMeta;
5859
pub use datafusion_datasource::file_scan_config::{

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ mod tests {
6767
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
6868

6969
use chrono::{TimeZone, Utc};
70+
use datafusion_datasource::file_groups::FileGroup;
7071
use futures::StreamExt;
7172
use object_store::local::LocalFileSystem;
7273
use object_store::path::Path;
@@ -1123,7 +1124,7 @@ mod tests {
11231124

11241125
async fn assert_parquet_read(
11251126
state: &SessionState,
1126-
file_groups: Vec<Vec<PartitionedFile>>,
1127+
file_groups: Vec<FileGroup>,
11271128
expected_row_num: Option<usize>,
11281129
file_schema: SchemaRef,
11291130
) -> Result<()> {
@@ -1166,12 +1167,12 @@ mod tests {
11661167
.infer_schema(&state, &store, std::slice::from_ref(&meta))
11671168
.await?;
11681169

1169-
let group_empty = vec![vec![file_range(&meta, 0, 2)]];
1170-
let group_contain = vec![vec![file_range(&meta, 2, i64::MAX)]];
1171-
let group_all = vec![vec![
1170+
let group_empty = vec![FileGroup::new(vec![file_range(&meta, 0, 2)])];
1171+
let group_contain = vec![FileGroup::new(vec![file_range(&meta, 2, i64::MAX)])];
1172+
let group_all = vec![FileGroup::new(vec![
11721173
file_range(&meta, 0, 2),
11731174
file_range(&meta, 2, i64::MAX),
1174-
]];
1175+
])];
11751176

11761177
assert_parquet_read(&state, group_empty, None, file_schema.clone()).await?;
11771178
assert_parquet_read(&state, group_contain, Some(8), file_schema.clone()).await?;

datafusion/core/src/datasource/statistics.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ use std::sync::Arc;
2020

2121
use futures::{Stream, StreamExt};
2222

23-
use datafusion_common::stats::Precision;
24-
use datafusion_common::ScalarValue;
25-
2623
use crate::arrow::datatypes::SchemaRef;
2724
use crate::error::Result;
2825
use crate::physical_plan::{ColumnStatistics, Statistics};
26+
use datafusion_common::stats::Precision;
27+
use datafusion_common::ScalarValue;
28+
use datafusion_datasource::file_groups::FileGroup;
2929

3030
use super::listing::PartitionedFile;
3131

@@ -39,8 +39,8 @@ pub async fn get_statistics_with_limit(
3939
file_schema: SchemaRef,
4040
limit: Option<usize>,
4141
collect_stats: bool,
42-
) -> Result<(Vec<PartitionedFile>, Statistics)> {
43-
let mut result_files = vec![];
42+
) -> Result<(FileGroup, Statistics)> {
43+
let mut result_files = FileGroup::default();
4444
// These statistics can be calculated as long as at least one file provides
4545
// useful information. If none of the files provides any information, then
4646
// they will end up having `Precision::Absent` values. Throughout calculations,

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ use datafusion_physical_plan::unnest::ListUnnest;
9090

9191
use crate::schema_equivalence::schema_satisfied_by;
9292
use async_trait::async_trait;
93+
use datafusion_datasource::file_groups::FileGroup;
9394
use futures::{StreamExt, TryStreamExt};
9495
use itertools::{multiunzip, Itertools};
9596
use log::{debug, trace};
@@ -532,7 +533,7 @@ impl DefaultPhysicalPlanner {
532533
original_url,
533534
object_store_url,
534535
table_paths: vec![parsed_url],
535-
file_groups: vec![],
536+
file_group: FileGroup::default(),
536537
output_schema: Arc::new(schema),
537538
table_partition_cols,
538539
insert_op: InsertOp::Append,

0 commit comments

Comments
 (0)