Skip to content

Commit 147c7a7

Browse files
committed
Sketch out cached filter result API
1 parent 57d008d commit 147c7a7

File tree

8 files changed

+530
-38
lines changed

8 files changed

+530
-38
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@ use arrow_schema::{DataType, Fields, SchemaBuilder};
2222
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
2323
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2424
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
25-
use crate::arrow::array_reader::{
26-
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
27-
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
28-
PrimitiveArrayReader, RowGroups, StructArrayReader,
29-
};
25+
use crate::arrow::array_reader::{make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader};
3026
use crate::arrow::schema::{ParquetField, ParquetFieldType};
3127
use crate::arrow::ProjectionMask;
3228
use crate::basic::Type as PhysicalType;
@@ -37,12 +33,12 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3733
/// Builds array reader from parquet schema, projection mask, and parquet file reader.
3834
pub(crate) struct ArrayReaderBuilder<'a> {
3935
row_groups: &'a dyn RowGroups,
40-
// todo add cached predicate results
36+
cached_predicate_result: Option<&'a CachedPredicateResult>,
4137
}
4238

4339
impl<'a> ArrayReaderBuilder<'a> {
44-
pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
45-
Self { row_groups }
40+
pub(crate) fn new(row_groups: &'a dyn RowGroups, cached_predicate_result: Option<&'a CachedPredicateResult>) -> Self {
41+
Self { row_groups, cached_predicate_result }
4642
}
4743

4844
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
@@ -64,11 +60,16 @@ impl<'a> ArrayReaderBuilder<'a> {
6460
self.row_groups.num_rows()
6561
}
6662

63+
6764
fn build_reader(
6865
&self,
6966
field: &ParquetField,
7067
mask: &ProjectionMask,
7168
) -> Result<Option<Box<dyn ArrayReader>>> {
69+
if let Some(builder) = self.build_cached_reader(field, mask)? {
70+
return Ok(Some(builder));
71+
}
72+
7273
match field.field_type {
7374
ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask),
7475
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -80,6 +81,29 @@ impl<'a> ArrayReaderBuilder<'a> {
8081
d => unimplemented!("reading group type {} not implemented", d),
8182
},
8283
}
84+
}
85+
86+
/// Build cached array reader if the field is in the projection mask and in the cache
87+
fn build_cached_reader(
88+
&self,
89+
field: &ParquetField,
90+
mask: &ProjectionMask,
91+
) -> Result<Option<Box<dyn ArrayReader>>> {
92+
let Some(cached_predicate_result) = self.cached_predicate_result else {
93+
return Ok(None);
94+
};
95+
96+
// TODO how to find a cached struct / list
97+
// (Probably have to cache the individual fields)
98+
let ParquetFieldType::Primitive { col_idx, primitive_type: _} = &field.field_type else {
99+
return Ok(None);
100+
};
101+
102+
if !mask.leaf_included(*col_idx) {
103+
return Ok(None);
104+
}
105+
106+
cached_predicate_result.build_reader(*col_idx)
83107
}
84108

85109
/// Build array reader for map type.
@@ -376,7 +400,8 @@ mod tests {
376400
)
377401
.unwrap();
378402

379-
let array_reader = ArrayReaderBuilder::new(&file_reader)
403+
let cached_predicate_result = None;
404+
let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result)
380405
.build_array_reader(fields.as_ref(), &mask)
381406
.unwrap();
382407

0 commit comments

Comments
 (0)