Skip to content

Commit d1d9f7b

Browse files
alambfindepi
authored andcommitted
Refactor parquet row group pruning into a struct (use new statistics API, part 1) (apache#10607)
* Refactor parquet row group pruning into a struct * Port tests * improve docs * fix msrv
1 parent f2e8385 commit d1d9f7b

File tree

4 files changed

+366
-282
lines changed

4 files changed

+366
-282
lines changed

datafusion/core/src/datasource/listing/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ pub struct FileRange {
4848
pub end: i64,
4949
}
5050

51+
impl FileRange {
52+
/// returns true if this file range contains the specified offset
53+
pub fn contains(&self, offset: i64) -> bool {
54+
offset >= self.start && offset < self.end
55+
}
56+
}
57+
5158
#[derive(Debug, Clone)]
5259
/// A single file or part of a file that should be read, along with its schema, statistics
5360
/// and partition column values that need to be appended to each row.

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ mod row_groups;
7070
mod schema_adapter;
7171
mod statistics;
7272

73+
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
7374
pub use metrics::ParquetFileMetrics;
7475
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
7576
pub use statistics::{RequestedStatistics, StatisticsConverter};
@@ -556,32 +557,36 @@ impl FileOpener for ParquetOpener {
556557
};
557558
};
558559

559-
// Row group pruning by statistics: attempt to skip entire row_groups
560-
// using metadata on the row groups
560+
// Determine which row groups to actually read. The idea is to skip
561+
// as many row groups as possible based on the metadata and query
561562
let file_metadata = builder.metadata().clone();
562563
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
563-
let mut row_groups = row_groups::prune_row_groups_by_statistics(
564-
&file_schema,
565-
builder.parquet_schema(),
566-
file_metadata.row_groups(),
567-
file_range,
568-
predicate,
569-
&file_metrics,
570-
);
564+
let rg_metadata = file_metadata.row_groups();
565+
// track which row groups to actually read
566+
let mut row_groups = RowGroupSet::new(rg_metadata.len());
567+
// if there is a range restricting what parts of the file to read
568+
if let Some(range) = file_range.as_ref() {
569+
row_groups.prune_by_range(rg_metadata, range);
570+
}
571+
// If there is a predicate that can be evaluated against the metadata
572+
if let Some(predicate) = predicate.as_ref() {
573+
row_groups.prune_by_statistics(
574+
&file_schema,
575+
builder.parquet_schema(),
576+
rg_metadata,
577+
predicate,
578+
&file_metrics,
579+
);
571580

572-
// Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups
573-
// using bloom filters on the row groups
574-
if enable_bloom_filter && !row_groups.is_empty() {
575-
if let Some(predicate) = predicate {
576-
row_groups = row_groups::prune_row_groups_by_bloom_filters(
577-
&file_schema,
578-
&mut builder,
579-
&row_groups,
580-
file_metadata.row_groups(),
581-
predicate,
582-
&file_metrics,
583-
)
584-
.await;
581+
if enable_bloom_filter && !row_groups.is_empty() {
582+
row_groups
583+
.prune_by_bloom_filters(
584+
&file_schema,
585+
&mut builder,
586+
predicate,
587+
&file_metrics,
588+
)
589+
.await;
585590
}
586591
}
587592

@@ -610,7 +615,7 @@ impl FileOpener for ParquetOpener {
610615
let stream = builder
611616
.with_projection(mask)
612617
.with_batch_size(batch_size)
613-
.with_row_groups(row_groups)
618+
.with_row_groups(row_groups.indexes())
614619
.build()?;
615620

616621
let adapted = stream

datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use std::collections::HashSet;
4242
use std::sync::Arc;
4343

4444
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
45+
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
4546
use crate::datasource::physical_plan::parquet::statistics::{
4647
from_bytes_to_i128, parquet_column,
4748
};
@@ -99,7 +100,7 @@ use super::metrics::ParquetFileMetrics;
99100
///
100101
/// Using `A > 35`: can rule out all of values in Page 1 (rows 0 -> 199)
101102
///
102-
/// Using `B = 'F'`: can rule out all vaues in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299)
103+
/// Using `B = 'F'`: can rule out all values in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299)
103104
///
104105
/// So we can entirely skip rows 0->199 and 250->299 as we know they
105106
/// can not contain rows that match the predicate.
@@ -133,7 +134,7 @@ impl PagePruningPredicate {
133134
&self,
134135
arrow_schema: &Schema,
135136
parquet_schema: &SchemaDescriptor,
136-
row_groups: &[usize],
137+
row_groups: &RowGroupSet,
137138
file_metadata: &ParquetMetaData,
138139
file_metrics: &ParquetFileMetrics,
139140
) -> Result<Option<RowSelection>> {
@@ -172,10 +173,10 @@ impl PagePruningPredicate {
172173
let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
173174
let mut selectors = Vec::with_capacity(row_groups.len());
174175
for r in row_groups.iter() {
175-
let row_group_metadata = &groups[*r];
176+
let row_group_metadata = &groups[r];
176177

177-
let rg_offset_indexes = file_offset_indexes.get(*r);
178-
let rg_page_indexes = file_page_indexes.get(*r);
178+
let rg_offset_indexes = file_offset_indexes.get(r);
179+
let rg_page_indexes = file_page_indexes.get(r);
179180
if let (Some(rg_page_indexes), Some(rg_offset_indexes), Some(col_idx)) =
180181
(rg_page_indexes, rg_offset_indexes, col_idx)
181182
{
@@ -185,7 +186,7 @@ impl PagePruningPredicate {
185186
predicate,
186187
rg_offset_indexes.get(col_idx),
187188
rg_page_indexes.get(col_idx),
188-
groups[*r].column(col_idx).column_descr(),
189+
groups[r].column(col_idx).column_descr(),
189190
file_metrics,
190191
)
191192
.map_err(|e| {
@@ -201,7 +202,7 @@ impl PagePruningPredicate {
201202
);
202203
// fallback select all rows
203204
let all_selected =
204-
vec![RowSelector::select(groups[*r].num_rows() as usize)];
205+
vec![RowSelector::select(groups[r].num_rows() as usize)];
205206
selectors.push(all_selected);
206207
}
207208
}

0 commit comments

Comments
 (0)