Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ use arrow_array::{
UInt64Array, UInt8Array,
};
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, Result,
};
use half::f16;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetColumnIndex, RowGroupMetaData};
use parquet::file::page_index::index::{Index, PageIndex};
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use paste::paste;
Expand Down Expand Up @@ -578,7 +581,7 @@ pub(crate) fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// the index of the matched column in the parquet schema
parquet_index: Option<usize>,
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the arrow schema
arrow_field: &'a Field,
}
Expand Down Expand Up @@ -666,7 +669,7 @@ impl<'a> StatisticsConverter<'a> {
};

Ok(Self {
parquet_index,
parquet_column_index: parquet_index,
arrow_field,
})
}
Expand Down Expand Up @@ -717,7 +720,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

Expand All @@ -736,7 +739,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

Expand All @@ -755,7 +758,7 @@ impl<'a> StatisticsConverter<'a> {
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let Some(parquet_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

Expand All @@ -766,10 +769,108 @@ impl<'a> StatisticsConverter<'a> {
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}

/// Returns a null array of data_type with one element per row group
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
/// Extract the minimum values from Data page statistics
///
/// In Parquet files, in addition to the Column Chunk level statistics
/// (stored for each column for each row group) there are also optional
/// statistics stored for each data page, part of the [Parquet Page Index].
/// Since a single Column Chunk is stored as one or more pages, page level statistics
/// can prune at a finer granularity.
///
/// However since they are stored in a separate metadata structure
/// ([`Index`]) there is different code to extract them as arrow statistics
///
/// Parameters:
///
/// * `page_index`: The parquet page index, likely read from
/// [`ParquetMetadata::page_index()`]
///
/// * row_group_indexes: The indexes of the row groups (indexes in
/// `page_index`) to extract the statistics from. This is an interator over `&usize` to
/// permit passing in `&Vec<usize>` or similar
///
/// # Return Value
///
/// The returned array contains 1 value for each `NativeIndex` in the underlying
/// `Index`es, in the same order as they appear in `metadatas`.
///
/// For example, if there are two `Index`es in `metadatas`:
/// 1. the first having `3` `PageIndex` entries
/// 2. the second having `2` `PageIndex` entries
///
/// The returned array would have 5 rows
///
/// Each value is either
/// * the minimum value for the page
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min value was actually
/// `null` it means it the requested statistic is unknown
///
/// # Errors
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the pages are not present in the row group
/// * the stored statistic value can not be converted to the requested type
///
/// # Example
/// ```no_run
/// tood
/// ```
pub fn data_page_mins<I>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one way the API could look like

&self,
page_index: &ParquetColumnIndex,
row_group_indexes: I,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll guess one reason why we want to pass in the row_group_indexes is due to the iteration over the row_group_indexes from the access_plan here.

We cannot assume we need all indices since access_plan does filter based on should_scan() or not.
Is this correct? If it is, then this was the missing piece in my prototype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll guess one reason why we want to pass in the row_group_indexes is due to the iteration over the row_group_indexes from the access_plan here.

We cannot assume we need all indices since access_plan does filter based on should_scan() or not. Is this correct? If it is, then this was the missing piece in my prototype.

Yes I think that is correct

I am still not super thrilled with this interface (mostly because it is different than row_group_mins etc that takes an interator directly.

However, I couldn't figure out how to make the types workout for making an iterator over Vec ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this interface in some version of my prototype, where it would take an iterator over all row_groups directly. However, this cannot be easily integrated with the existing code and the access_plan.row_group_indices. Perhaps, once the StatisticsConverter is fully used in page_filter.rs we can change the interface?

) -> Result<ArrayRef>
where
I: IntoIterator<Item = &'a usize>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_column_index) = self.parquet_column_index else {
return Ok(self.make_null_array(data_type, row_group_indexes));
};

// iterator over &Index
let indexes = row_group_indexes
.into_iter()
.map(|rg_index| &page_index[*rg_index][parquet_column_index]);

// Get an iterator of the native index type depending on data type
match data_type {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to sort out how to make this look better (perhaps by following the lead of the row group iterators that make special iterators for each underlying parquet statistics datatype and then write the relevant code for converting them all to arrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this approach, which works fine. I'd simply refactor whats mentioned in the comment - and extract data type specific iterators (like we already have for the row groups statistics). What do you think @alamb, instead of using array builder?

pub(crate) fn min_page_statistics<'a, I: Iterator<Item = Option<&'a Index>>>(
    data_type: Option<&DataType>,
    iterator: I,
) -> Result<ArrayRef> {
    // Extract this into data type specific iterator e.g. MinInt64PageStatisticsIterator
    let iter = iterator.flat_map(|opt_index| match data_type {
        Some(DataType::Int64) => match opt_index {
            Some(Index::INT64(native_index)) => native_index
                .indexes
                .iter()
                .map(|x| x.min)
                .collect::<Vec<_>>(),
            _ => vec![None],
        },
        // other data_types
        _ => todo!(),
    });

    Ok(Arc::new(Int64Array::from_iter(iter)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an array builder is likely a better approach.

DataType::Boolean => {
// get an interator of Option<bool> across all indexes / pages
let iter = indexes
// flat map flattens the iterator over iterators into a single iterator over bools
.flat_map(|index| {
let page_indexes: &[PageIndex<bool>] =
if let Index::BOOLEAN(native_index) = index {
&native_index.indexes
} else {
&[]
};
page_indexes.iter().map(|v| v.min.clone())
});
// can't call this directly because the iterator above is not sized!
//Ok(Arc::new(BooleanArray::from_iter(iter)))
let mut builder = BooleanArray::builder(10);
for v in iter {
match v {
Some(v) => builder.append_value(v),
None => builder.append_null(),
};
}
Ok(Arc::new(builder.finish()))
}
_ => not_impl_err!("Datatype not yet implemented"),
}
}

/// Returns a null array of data_type with one element per entry in the metadatas
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
I: IntoIterator<Item = A>,
{
// column was in the arrow schema but not in the parquet schema, so return a null array
let num_row_groups = metadatas.into_iter().count();
Expand Down
164 changes: 124 additions & 40 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::StatisticsConverter;
use half::f16;
use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder};
use parquet::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{EnabledStatistics, WriterProperties};

Expand Down Expand Up @@ -159,11 +161,14 @@ impl TestReader {

// open the file & get the reader
let file = file.reopen().unwrap();
ArrowReaderBuilder::try_new(file).unwrap()

// tell the reader to read the page index, if it exists
let options = ArrowReaderOptions::new().with_page_index(true);
ArrowReaderBuilder::try_new_with_options(file, options).unwrap()
}
}

/// Defines a test case for statistics extraction
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a wierd test change, but I was trying to setup a pattern where we didn't have to change all the tests at once

/// Defines a test case for row group statistics extraction
struct Test<'a> {
/// The parquet file reader
reader: &'a ParquetRecordBatchReaderBuilder<File>,
Expand All @@ -186,43 +191,14 @@ impl<'a> Test<'a> {
column_name,
} = self;

let converter = StatisticsConverter::try_new(
run_test(
reader,
expected_min,
expected_max,
expected_null_counts,
expected_row_counts,
column_name,
reader.schema(),
reader.parquet_schema(),
)
.unwrap();

let row_groups = reader.metadata().row_groups();
let min = converter.row_group_mins(row_groups).unwrap();

assert_eq!(
&min, &expected_min,
"{column_name}: Mismatch with expected minimums"
);

let max = converter.row_group_maxes(row_groups).unwrap();
assert_eq!(
&max, &expected_max,
"{column_name}: Mismatch with expected maximum"
);

let null_counts = converter.row_group_null_counts(row_groups).unwrap();
let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef;
assert_eq!(
&null_counts, &expected_null_counts,
"{column_name}: Mismatch with expected null counts. \
Actual: {null_counts:?}. Expected: {expected_null_counts:?}"
);

let row_counts = StatisticsConverter::row_group_row_counts(
reader.metadata().row_groups().iter(),
)
.unwrap();
assert_eq!(
row_counts, expected_row_counts,
"{column_name}: Mismatch with expected row counts. \
Actual: {row_counts:?}. Expected: {expected_row_counts:?}"
false,
);
}

Expand All @@ -247,6 +223,114 @@ impl<'a> Test<'a> {
}
}

/// Defines a test case for row group and data page statistics extraction
///
/// This is a temporary structure until we are done with XXX while the the types
/// supported by data pages statistics are different than the types supported by
/// row groups
struct TestBoth<'a> {
/// The parquet file reader
reader: &'a ParquetRecordBatchReaderBuilder<File>,
expected_min: ArrayRef,
expected_max: ArrayRef,
expected_null_counts: UInt64Array,
expected_row_counts: UInt64Array,
/// Which column to extract statistics from
column_name: &'static str,
}

impl<'a> TestBoth<'a> {
fn run(self) {
let Self {
reader,
expected_min,
expected_max,
expected_null_counts,
expected_row_counts,
column_name,
} = self;
run_test(
reader,
expected_min,
expected_max,
expected_null_counts,
expected_row_counts,
column_name,
true,
);
}
}

fn run_test(
reader: &ParquetRecordBatchReaderBuilder<File>,
expected_min: ArrayRef,
expected_max: ArrayRef,
expected_null_counts: UInt64Array,
expected_row_counts: UInt64Array,
column_name: &'static str,
// Note the row groups and data pages have the same statistics
test_data_page_statistics: bool,
) {
let converter = StatisticsConverter::try_new(
column_name,
reader.schema(),
reader.parquet_schema(),
)
.unwrap();

let row_groups = reader.metadata().row_groups();
let min = converter.row_group_mins(row_groups).unwrap();

assert_eq!(
&min, &expected_min,
"{column_name}: Mismatch with expected row group minimums"
);

let max = converter.row_group_maxes(row_groups).unwrap();
assert_eq!(
&max, &expected_max,
"{column_name}: Mismatch with expected row group maximums"
);

let null_counts = converter.row_group_null_counts(row_groups).unwrap();
let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef;
assert_eq!(
&null_counts, &expected_null_counts,
"{column_name}: Mismatch with expected row group null counts. \
Actual: {null_counts:?}. Expected: {expected_null_counts:?}"
);

let row_counts =
StatisticsConverter::row_group_row_counts(reader.metadata().row_groups().iter())
.unwrap();
assert_eq!(
row_counts, expected_row_counts,
"{column_name}: Mismatch with expected row group row counts. \
Actual: {row_counts:?}. Expected: {expected_row_counts:?}"
);

if test_data_page_statistics {
// one Vec<Index> for each row group
let column_index = reader
.metadata()
.column_index()
.expect("File should have column indexes");

let row_group_indexes: Vec<usize> =
row_groups.iter().enumerate().map(|(i, _)| i).collect();

let mins = converter
.data_page_mins(column_index, &row_group_indexes)
.unwrap();

assert_eq!(
&mins, &expected_min,
"{column_name}: Mismatch with expected data page minimums"
);
// TODO maxes, null count, row count
}
}

// TESTS
//
// Remaining cases
Expand Down Expand Up @@ -1724,7 +1808,7 @@ async fn test_boolean() {
.build()
.await;

Test {
TestBoth {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By changing a test to TestBoth it will test both row group indexes and data page indexes

reader: &reader,
expected_min: Arc::new(BooleanArray::from(vec![false, false])),
expected_max: Arc::new(BooleanArray::from(vec![true, false])),
Expand Down