-
Notifications
You must be signed in to change notification settings - Fork 927
Add support for file row numbers in Parquet readers #7307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Thanks for you submission @jkylling, I'll try to get a first pass review done this week. In the meantime please add the Apache license to row_number.rs and correct the other lint errors. 🙏 |
Updated. Looking forward to the first review! I was very confused as to why cargo format did not work properly, but looks like you are already aware of this (#6179) :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, just a few nits for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @jkylling for taking this on. I've finished my first pass and have only one reservation. Otherwise it looks good and meets the criteria set forth in #7299 (comment).
row_groups: VecDeque::from( | ||
row_groups | ||
.into_iter() | ||
.map(TryInto::try_into) | ||
.collect::<Result<Vec<_>>>()?, | ||
), | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding myself a bit uneasy with adding the first row number to the RowGroupMetaData
. Rather than that, could this bit here instead be changed to keep track of the first row number while populating the deque? Is there some wrinkle I'm missing? Might the row groups be filtered before instantiating the RowNumberReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered my own question...it seems there's some complexity here at least when using the async reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe we don't have access to all row groups when creating the array readers.
I took a quick look at the corresponding Parquet reader implementations for Trino and parquet-java.
Trino:
- Has a boolean to include a row number column, https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L112
- Includes this column when the boolean is set: https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L337
- Has a special block reader for reading row indexes https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L385-L393 I believe the positions play a similar role to our
RowSelectors
. - Gets row indexes from
RowGroupInfo
, a pruned version of https://github.com/trinodb/trino/blob/a54d38a30e486a94a365c7f12a94e47beb30b0fa/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java#L456 - Populates the
fileRowOffset
by iterating through the row groups: https://github.com/trinodb/trino/blob/master/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java#L107-L111
parquet-java:
- Has a method for tracking the current row index: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java#L150-L155
- This row index is based on an iterator which starts form a row group row index, https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java#L311-L339
- This row group row index is initialized by iterating through the row groups: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1654-L1656 (mapping obtained here: https://github.com/apache/parquet-java/blob/7d1fe32c8c972710a9d780ec5e7d1f95d871374d/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1496-L1506)
Their approaches are rather similar to ours.
One take away is that the above implementations do not be keep the full RowGroupMetaData
s around as we do by requiring an iterator over RowGroupMetadata
in the RowGroups
trait. This is likely a good idea as this struct can be quite large. What do you think about changing the RowGroups
trait to something like below?
/// A collection of row groups
pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize {
self.row_group_infos.iter().map(|info| info.num_rows).sum()
}
/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
/// Returns an iterator over the row groups in this collection
fn row_group_infos(&self) -> Box<dyn Iterator<Item = &RowGroupInfo> + '_>;
}
struct RowGroupInfo {
num_rows: usize,
row_index: i64,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary...the full ParquetMetaData
is already available everywhere this trait is implemented, so I don't see a need to worry about adding another metadata structure here.
parquet/src/file/metadata/mod.rs
Outdated
@@ -584,6 +585,11 @@ impl RowGroupMetaData { | |||
self.num_rows | |||
} | |||
|
|||
/// Returns the first row number in this row group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Returns the first row number in this row group. | |
/// Returns the global index number for the first row in this row group. |
And perhaps use first_row_index
instead? That may be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Updated.
Sorry @jkylling, things have been rather hectic lately. I'll try to give it another look this week, along with some benchmarking (but I don't expect any perf hit). I'll just note that since this is a breaking change, it won't be able to be merged until the next major release (July-ish IIRC), so there's plenty of time to get this right. Also, I'll be deferring to those with more project history (e.g. @alamb @tustvold) as to whether the approach here is the best way to achieve the goal. Thank you for your contribution and your patience! 😄 |
Yeah, sorry I also have been slammed with many other projects. I'll try and find time to look but I suspect it may be a while |
Thank you for the update, and totally understand other responsibilities are taking up your time. I'll keep on being patient, and maybe do some minor improvements to this PR (use a smaller struct than the full RowGroupMetadata, and add some benchmarks for the RowNumberReader). Just want to make sure we have this PR ready before the next major release approaches. |
Yes, a benchmark that shows minimal impact with no row numbers would be nice (and hopefully adding row numbers won't be bad either 😄). |
let mut row_groups = Vec::new(); | ||
t_file_metadata.row_groups.sort_by_key(|rg| rg.ordinal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these sorts necessary? Would the ordinal ever be out of order? They shouldn't be if I understand the encryption spec correctly.
100% agreed that simplicity and maintainability are paramount... but row numbers are a pretty fundamental feature that's very hard to emulate in higher layers if the parquet reader doesn't support them. Back when https://github.com/delta-io/delta first took a dependency on row numbers, spark's parquet reader did not yet support them; we had to disable row group pruning and other optimizations in order to make it (mostly) safe to manually compute row numbers in the query engine. It was really painful. AFAIK, most parquet readers now support row numbers. We can add DuckDB and Iceberg to the ones already mentioned above. I was actually surprised to trip over this PR and learn that arrow-parquet does not yet support row numbers. |
@@ -52,12 +70,13 @@ fn build_reader( | |||
field: &ParquetField, | |||
mask: &ProjectionMask, | |||
row_groups: &dyn RowGroups, | |||
row_number_column: Option<&str>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a crazy idea, but wouldn't the implementation be simpler (and more flexible) with a RowNumber
extension type? Then users could do e.g.
Field::new("row_index", DataType::Int64, false).with_extension_type(RowNumber))
and build_primitive_reader
could just check for it, no matter where in the schema it hides, instead of implicitly adding an extra column to the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I don't think raw parquet types support metadata, so this may not be an option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would simplify usage of the feature. Having to keep track of the additional row number column is quite cumbersome in clients of this API. One option could be to extend ParquetFieldType with an additional row number type and add it based on the extension type in ArrowReaderMetadata::with_supplied_metadata? @etseidl @alamb what do you think about this approach?
struct RowGroupSizeIterator { | ||
row_groups: VecDeque<RowGroupSize>, | ||
} | ||
|
||
impl RowGroupSizeIterator { | ||
fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self> | ||
where | ||
I: TryInto<RowGroupSize, Error = ParquetError>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this whole RowGroupSizeIterator
thing is a complicated and error-prone way of chaining several Range<i64>
? Can we use standard iterator machinery instead?
pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}
impl RowNumberReader {
pub(crate) fn try_new<'a>(
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
let ranges = row_groups
.map(|rg| {
let first_row_number = rg.first_row_index().ok_or(ParquetError::General(
"Row group missing row number".to_string(),
))?;
Ok(first_row_number..first_row_number + rg.num_rows())
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
})
}
// Use `take` on a `&mut Iterator` to consume a number of elements without consuming the iterator.
fn take(&mut self, batch_size: usize) -> impl Iterator<Item = i64> {
(&mut self.remaining_row_numbers).take(batch_size)
}
}
impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers.extend(self.take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
}
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Ok(self.take(num_records).count())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much simpler. Thank you! I suspect we are missing out on some performance in skip_records
with this, but the bulk of the data pruning will likely have happened by pruning Parquet row groups already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich I see you are involved in the maintenance of delta-kernel-rs. If you are interested, I've started on an implementation of deletion vector read support in delta-rs in this branch, based on a back port of an early version of this PR to arrow-54.2.1. The PR is still very rough, but the read path has got okay test coverage and it's able to read tables with deletion vectors produced by Spark correctly. The write support for deletion vectors is rudimentary (deletion vectors are only used for deletes when configured, and deleting from the same file twice is unsupported), and is mostly there to be able to unit test the read support. Unfortunately, I've not had time to wokr on this lately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW @zhuqi-lucas and I are working on improvements to the filter application here, which may result in some additional API churn:
Co-authored-by: scovich <[email protected]>
Which issue does this PR close?
Closes #7299.
What changes are included in this PR?
In this PR we:
ArrowReaderBuilder
to set arow_number_column
used to extend the readRecordBatches
with an additional column with file row numbers.ArrayReader
to the vector ofArrayReader
s reading columns from the Parquet file, if therow_number_column
is set in the reader configuration. This is aRowNumberReader
, which is a specialArrayReader
. It reads no data from the Parquet pages, but uses the first row numbers in theRowGroupMetaData
to keep track of progress.The
RowGroupMetaData::first_row_number
isOption<i64>
, since it is possible that the row number is unknown (I encountered an instance of this when trying to integrate this PR in delta-rs), and it's better ifNone
is used instead of some special integer value.The performance impact of this PR should be negligible when the row number column is not set. The only additional overhead would be the tracking of the
first_row_number
of each row group.Are there any user-facing changes?
We add an additional public method:
ArrowReaderBuilder::with_row_number_column
There are a few breaking changes as we touch a few public interfaces:
RowGroupMetaData::from_thrift
andRowGroupMetaData::from_thrift_encrypted
takes an additional parameterfirst_row_number: Optional<i64>
.RowGroups
has an additional methodRowGroups::row_groups
. Potentially this method could replace theRowGroups::num_rows
method or provide a default implementation for it.ParquetError::RowGroupMetaDataMissingRowNumber
.I'm very open to suggestions on how to reduce the amount of breaking changes.