Skip to content

Commit f1f7103

Browse files
committed
Sketch out cached filter result API
1 parent 57d008d commit f1f7103

File tree

8 files changed

+546
-34
lines changed

8 files changed

+546
-34
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
2323
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2424
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2525
use crate::arrow::array_reader::{
26-
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
26+
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, CachedPredicateResult,
2727
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
2828
PrimitiveArrayReader, RowGroups, StructArrayReader,
2929
};
@@ -37,12 +37,18 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
3737
/// Builds array reader from parquet schema, projection mask, and parquet file reader.
3838
pub(crate) struct ArrayReaderBuilder<'a> {
3939
row_groups: &'a dyn RowGroups,
40-
// todo add cached predicate results
40+
cached_predicate_result: Option<&'a CachedPredicateResult>,
4141
}
4242

4343
impl<'a> ArrayReaderBuilder<'a> {
44-
pub(crate) fn new(row_groups: &'a dyn RowGroups) -> Self {
45-
Self { row_groups }
44+
pub(crate) fn new(
45+
row_groups: &'a dyn RowGroups,
46+
cached_predicate_result: Option<&'a CachedPredicateResult>,
47+
) -> Self {
48+
Self {
49+
row_groups,
50+
cached_predicate_result,
51+
}
4652
}
4753

4854
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
@@ -69,6 +75,10 @@ impl<'a> ArrayReaderBuilder<'a> {
6975
field: &ParquetField,
7076
mask: &ProjectionMask,
7177
) -> Result<Option<Box<dyn ArrayReader>>> {
78+
if let Some(builder) = self.build_cached_reader(field, mask)? {
79+
return Ok(Some(builder));
80+
}
81+
7282
match field.field_type {
7383
ParquetFieldType::Primitive { .. } => self.build_primitive_reader(field, mask),
7484
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -82,6 +92,33 @@ impl<'a> ArrayReaderBuilder<'a> {
8292
}
8393
}
8494

95+
/// Build cached array reader if the field is in the projection mask and in the cache
96+
fn build_cached_reader(
97+
&self,
98+
field: &ParquetField,
99+
mask: &ProjectionMask,
100+
) -> Result<Option<Box<dyn ArrayReader>>> {
101+
let Some(cached_predicate_result) = self.cached_predicate_result else {
102+
return Ok(None);
103+
};
104+
105+
// TODO how to find a cached struct / list
106+
// (Probably have to cache the individual fields)
107+
let ParquetFieldType::Primitive {
108+
col_idx,
109+
primitive_type: _,
110+
} = &field.field_type
111+
else {
112+
return Ok(None);
113+
};
114+
115+
if !mask.leaf_included(*col_idx) {
116+
return Ok(None);
117+
}
118+
119+
cached_predicate_result.build_reader(*col_idx)
120+
}
121+
85122
/// Build array reader for map type.
86123
fn build_map_reader(
87124
&self,
@@ -376,7 +413,8 @@ mod tests {
376413
)
377414
.unwrap();
378415

379-
let array_reader = ArrayReaderBuilder::new(&file_reader)
416+
let cached_predicate_result = None;
417+
let array_reader = ArrayReaderBuilder::new(&file_reader, cached_predicate_result)
380418
.build_array_reader(fields.as_ref(), &mask)
381419
.unwrap();
382420

0 commit comments

Comments
 (0)