-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Initial Extract parquet data page statistics API #10852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ed6a0f3
e4729e8
5e6280a
5293d29
c771504
72a2d4a
6cedd47
3e2ffef
596913a
23f1430
8ccfe89
fff96c4
5ade0be
a302f4e
96c99ce
d731f44
307e2ef
6f7e856
8d1b99c
830f662
d25dd9a
a381407
a5b6b9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,8 @@ use arrow_array::{ | |
use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; | ||
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; | ||
use half::f16; | ||
use parquet::file::metadata::RowGroupMetaData; | ||
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; | ||
use parquet::file::page_index::index::Index; | ||
use parquet::file::statistics::Statistics as ParquetStatistics; | ||
use parquet::schema::types::SchemaDescriptor; | ||
use paste::paste; | ||
|
@@ -517,6 +518,74 @@ macro_rules! get_statistics { | |
}}} | ||
} | ||
|
||
macro_rules! make_data_page_stats_iterator { | ||
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => { | ||
struct $iterator_type<'a, I> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
iter: I, | ||
} | ||
|
||
impl<'a, I> $iterator_type<'a, I> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
fn new(iter: I) -> Self { | ||
Self { iter } | ||
} | ||
} | ||
|
||
impl<'a, I> Iterator for $iterator_type<'a, I> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
type Item = Vec<Option<$stat_value_type>>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
let next = self.iter.next(); | ||
match next { | ||
Some((len, index)) => match index { | ||
$index_type(native_index) => Some( | ||
native_index | ||
.indexes | ||
.iter() | ||
.map(|x| x.$func) | ||
.collect::<Vec<_>>(), | ||
), | ||
// No matching `Index` found; | ||
// thus no statistics that can be extracted. | ||
// We return vec![None; len] to effectively | ||
// create an arrow null-array with the length | ||
// corresponding to the number of entries in | ||
// `ParquetOffsetIndex` per row group per column. | ||
_ => Some(vec![None; len]), | ||
}, | ||
_ => None, | ||
} | ||
} | ||
|
||
fn size_hint(&self) -> (usize, Option<usize>) { | ||
self.iter.size_hint() | ||
} | ||
} | ||
}; | ||
} | ||
|
||
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64); | ||
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64); | ||
|
||
macro_rules! get_data_page_statistics { | ||
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { | ||
paste! { | ||
match $data_type { | ||
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), | ||
_ => unimplemented!() | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Lookups up the parquet column by name | ||
/// | ||
/// Returns the parquet column index and the corresponding arrow field | ||
|
@@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>( | |
get_statistics!(Max, data_type, iterator) | ||
} | ||
|
||
/// Extracts the min statistics from an iterator | ||
/// of parquet page [`Index`]'es to an [`ArrayRef`] | ||
pub(crate) fn min_page_statistics<'a, I>( | ||
data_type: Option<&DataType>, | ||
iterator: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
get_data_page_statistics!(Min, data_type, iterator) | ||
} | ||
|
||
/// Extracts the max statistics from an iterator | ||
/// of parquet page [`Index`]'es to an [`ArrayRef`] | ||
pub(crate) fn max_page_statistics<'a, I>( | ||
data_type: Option<&DataType>, | ||
iterator: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
get_data_page_statistics!(Max, data_type, iterator) | ||
} | ||
|
||
/// Extracts the null count statistics from an iterator | ||
/// of parquet page [`Index`]'es to an [`ArrayRef`] | ||
/// | ||
/// The returned Array is an [`UInt64Array`] | ||
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef> | ||
where | ||
I: Iterator<Item = (usize, &'a Index)>, | ||
{ | ||
let iter = iterator.flat_map(|(len, index)| match index { | ||
Index::NONE => vec![None; len], | ||
Index::INT64(native_index) => native_index | ||
.indexes | ||
.iter() | ||
.map(|x| x.null_count.map(|x| x as u64)) | ||
.collect::<Vec<_>>(), | ||
_ => unimplemented!(), | ||
}); | ||
|
||
Ok(Arc::new(UInt64Array::from_iter(iter))) | ||
} | ||
|
||
/// Extracts Parquet statistics as Arrow arrays | ||
/// | ||
/// This is used to convert Parquet statistics to Arrow arrays, with proper type | ||
|
@@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> { | |
Ok(Arc::new(UInt64Array::from_iter(null_counts))) | ||
} | ||
|
||
/// Extract the minimum values from Data Page statistics. | ||
/// | ||
/// In Parquet files, in addition to the Column Chunk level statistics | ||
/// (stored for each column for each row group) there are also | ||
/// optional statistics stored for each data page, as part of | ||
/// the [`ParquetColumnIndex`]. | ||
/// | ||
/// Since a single Column Chunk is stored as one or more pages, | ||
/// page level statistics can prune at a finer granularity. | ||
/// | ||
/// However since they are stored in a separate metadata | ||
/// structure ([`Index`]) there is different code to extract them as | ||
/// compared to arrow statistics. | ||
/// | ||
/// # Parameters: | ||
/// | ||
/// * `column_page_index`: The parquet column page indices, read from | ||
/// `ParquetMetaData` column_index | ||
/// | ||
/// * `column_offset_index`: The parquet column offset indices, read from | ||
/// `ParquetMetaData` offset_index | ||
/// | ||
/// * `row_group_indices`: The indices of the row groups, that are used to | ||
/// extract the column page index and offset index on a per row group | ||
/// per column basis. | ||
/// | ||
/// # Return Value | ||
/// | ||
/// The returned array contains 1 value for each `NativeIndex` | ||
/// in the underlying `Index`es, in the same order as they appear | ||
/// in `metadatas`. | ||
/// | ||
/// For example, if there are two `Index`es in `metadatas`: | ||
/// 1. the first having `3` `PageIndex` entries | ||
/// 2. the second having `2` `PageIndex` entries | ||
/// | ||
/// The returned array would have 5 rows. | ||
/// | ||
/// Each value is either: | ||
/// * the minimum value for the page | ||
/// * a null value, if the statistics can not be extracted | ||
/// | ||
/// Note that a null value does NOT mean the min value was actually | ||
/// `null` it means it the requested statistic is unknown | ||
/// | ||
/// # Errors | ||
/// | ||
/// Reasons for not being able to extract the statistics include: | ||
/// * the column is not present in the parquet file | ||
/// * statistics for the pages are not present in the row group | ||
/// * the stored statistic value can not be converted to the requested type | ||
pub fn data_page_mins<I>( | ||
&self, | ||
column_page_index: &ParquetColumnIndex, | ||
column_offset_index: &ParquetOffsetIndex, | ||
row_group_indices: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: IntoIterator<Item = &'a usize>, | ||
{ | ||
let data_type = self.arrow_field.data_type(); | ||
|
||
let Some(parquet_index) = self.parquet_index else { | ||
return Ok(self.make_null_array(data_type, row_group_indices)); | ||
}; | ||
|
||
let iter = row_group_indices.into_iter().map(|rg_index| { | ||
let column_page_index_per_row_group_per_column = | ||
&column_page_index[*rg_index][parquet_index]; | ||
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); | ||
|
||
(*num_data_pages, column_page_index_per_row_group_per_column) | ||
}); | ||
|
||
min_page_statistics(Some(data_type), iter) | ||
} | ||
|
||
/// Extract the maximum values from Data Page statistics. | ||
/// | ||
/// See docs on [`Self::data_page_mins`] for details. | ||
pub fn data_page_maxes<I>( | ||
&self, | ||
column_page_index: &ParquetColumnIndex, | ||
column_offset_index: &ParquetOffsetIndex, | ||
row_group_indices: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: IntoIterator<Item = &'a usize>, | ||
{ | ||
let data_type = self.arrow_field.data_type(); | ||
|
||
let Some(parquet_index) = self.parquet_index else { | ||
return Ok(self.make_null_array(data_type, row_group_indices)); | ||
}; | ||
|
||
let iter = row_group_indices.into_iter().map(|rg_index| { | ||
let column_page_index_per_row_group_per_column = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wow those structures are hard to use 🤯 -- seems like having an accessor would help a lot. Something to consider upstream maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this makes a lot of sense. |
||
&column_page_index[*rg_index][parquet_index]; | ||
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); | ||
|
||
(*num_data_pages, column_page_index_per_row_group_per_column) | ||
}); | ||
|
||
max_page_statistics(Some(data_type), iter) | ||
} | ||
|
||
/// Extract the null counts from Data Page statistics. | ||
/// | ||
/// The returned Array is an [`UInt64Array`] | ||
/// | ||
/// See docs on [`Self::data_page_mins`] for details. | ||
pub fn data_page_null_counts<I>( | ||
&self, | ||
column_page_index: &ParquetColumnIndex, | ||
column_offset_index: &ParquetOffsetIndex, | ||
row_group_indices: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: IntoIterator<Item = &'a usize>, | ||
{ | ||
let data_type = self.arrow_field.data_type(); | ||
|
||
let Some(parquet_index) = self.parquet_index else { | ||
return Ok(self.make_null_array(data_type, row_group_indices)); | ||
}; | ||
|
||
let iter = row_group_indices.into_iter().map(|rg_index| { | ||
let column_page_index_per_row_group_per_column = | ||
&column_page_index[*rg_index][parquet_index]; | ||
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); | ||
|
||
(*num_data_pages, column_page_index_per_row_group_per_column) | ||
}); | ||
null_counts_page_statistics(iter) | ||
} | ||
|
||
/// Returns an [`ArrayRef`] with row counts for each row group. | ||
/// | ||
/// This function iterates over the given row group indexes and computes | ||
/// the row count for each page in the specified column. | ||
/// | ||
/// # Parameters: | ||
/// | ||
/// * `column_offset_index`: The parquet column offset indices, read from | ||
/// `ParquetMetaData` offset_index | ||
/// | ||
/// * `row_group_metadatas`: The metadata slice of the row groups, read | ||
/// from `ParquetMetaData` row_groups | ||
/// | ||
/// * `row_group_indices`: The indices of the row groups, that are used to | ||
/// extract the column offset index on a per row group per column basis. | ||
/// | ||
/// See docs on [`Self::data_page_mins`] for details. | ||
pub fn data_page_row_counts<I>( | ||
&self, | ||
column_offset_index: &ParquetOffsetIndex, | ||
row_group_metadatas: &[RowGroupMetaData], | ||
row_group_indices: I, | ||
) -> Result<ArrayRef> | ||
where | ||
I: IntoIterator<Item = &'a usize>, | ||
{ | ||
let data_type = self.arrow_field.data_type(); | ||
|
||
let Some(parquet_index) = self.parquet_index else { | ||
return Ok(self.make_null_array(data_type, row_group_indices)); | ||
}; | ||
|
||
// `offset_index[row_group_number][column_number][page_number]` holds | ||
// the [`PageLocation`] corresponding to page `page_number` of column | ||
// `column_number`of row group `row_group_number`. | ||
let mut row_count_total = Vec::new(); | ||
for rg_idx in row_group_indices { | ||
let page_locations = &column_offset_index[*rg_idx][parquet_index]; | ||
|
||
let row_count_per_page = page_locations.windows(2).map(|loc| { | ||
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64) | ||
}); | ||
|
||
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); | ||
|
||
// append the last page row count | ||
let row_count_per_page = row_count_per_page | ||
.chain(std::iter::once(Some( | ||
*num_rows_in_row_group as u64 | ||
- page_locations.last().unwrap().first_row_index as u64, | ||
))) | ||
.collect::<Vec<_>>(); | ||
|
||
row_count_total.extend(row_count_per_page); | ||
} | ||
|
||
Ok(Arc::new(UInt64Array::from_iter(row_count_total))) | ||
} | ||
|
||
/// Returns a null array of data_type with one element per row group | ||
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef | ||
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef | ||
where | ||
I: IntoIterator<Item = &'a RowGroupMetaData>, | ||
I: IntoIterator<Item = A>, | ||
{ | ||
// column was in the arrow schema but not in the parquet schema, so return a null array | ||
let num_row_groups = metadatas.into_iter().count(); | ||
|
Uh oh!
There was an error while loading. Please reload this page.