-
Notifications
You must be signed in to change notification settings - Fork 926
Poc for adaptive parquet predicate pushdown(bitmap/range) with page cache(3 data pages) #7454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Only one remaining CI testing failed, i still can't find the root cause: #[tokio::test]
#[cfg(feature = "snap")]
async fn test_plaintext_footer_read_without_decryption() {
crate::encryption_agnostic::read_plaintext_footer_file_without_decryption_properties_async()
.await;
} |
Looks sweet -- thank you @zhuqi-lucas -- I am going to start reviewing this PR to get a feel for its code and be ready to merge / get it ready |
Thank you @alamb, i found the root cause of above fail, and also in progress make everything green. |
You are the best @zhuqi-lucas -- I am reviewing this PR now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @zhuqi-lucas
I really like your code in FilteredParquetRecordBatchReader
-- the idea of combining the application of the RowFilter
and the decoding of the projection into a single reader I think is a key insight and maybe points the way towards not decoding twce
After reviewing this code, it seems to me that a lot of work is done to use the same RowSelection
structure for both
- Skipping large contiguous chunks of rows (e.g row groups and entire pages)
- Applying a RowFilter for filtering individual rows
I think RowSelection
is well designed for the former, but quite bad for the latter (applying RowFilter)
As this PR starts down the path of separating the two concerns, I wonder if you have thought about pushing it even farther ? Something like keeping the results of the RowFilter
only as BooleanArrays
and then progressively decoding the remaining projections?
let array = reader.consume_batch()?; | ||
|
||
let filtered_array = | ||
filter(&array, bitmap).map_err(|e| ParquetError::General(e.to_string()))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a a very clever idea (to keep the filter and apply it to the next decoded batch)
self.row_filter.take() | ||
} | ||
|
||
fn create_bitmap_from_ranges(&mut self, runs: &[RowSelector]) -> BooleanArray { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code path is unfortunate -- converting from Butmap --> RowSelection. I have some ideas about how this could be better if we avoided this particular code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb , very good point, i was also thinking if we can return the bitmap from the predicate filter, we will have better performance. I will try to do this improvement also.
|
||
pub struct FilteredParquetRecordBatchReader { | ||
batch_size: usize, | ||
array_reader: Box<dyn ArrayReader>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really nice idea -- to have both array_readers
and predicate_readers
in the same structure. If we push this idea even more this idea could be used to avoid the second decode entirely and not modify RowSelection
at all.
/// [`RowSelection`] is an enum that can be either a list of [`RowSelector`]s | ||
/// or a [`BooleanArray`] bitmap | ||
#[derive(Debug, Clone, PartialEq)] | ||
pub enum RowSelection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the two different representations have different uses
- Skip contiguous ranges (basically skip entire data pages)
- filter out individual rows
I think we may be able to actually keep these as two separate structs rather than combining them into a single struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the two different representations have different uses
- Skip contiguous ranges (basically skip entire data pages)
- filter out individual rows
I think we may be able to actually keep these as two separate structs rather than combining them into a single struct.
Good suggestions, i am also feeling it's strange here, we'd better to use two structs.
Thank you @alamb fo review, i agree we can go further for this PR, i will try to do it. And the key improvement to reduce the regression is when average selection number is < 10, we will fallback to read all the row then to filter, and which is faster because it's vectorized better. if total < 10 * select_count {
// Bitmap branch
let bitmap = self.create_bitmap_from_ranges(&runs);
match self.array_reader.read_records(bitmap.len()) {
Ok(_) => {}
Err(e) => return Some(Err(e.into())),
};
mask_builder.append_buffer(bitmap.values());
rows_accum += bitmap.true_count();
} I agree create_bitmap_from_ranges has some overhead, if we can return the bitmap from the predicate filter, we will have better performance. I will try to do this improvement also. |
I wrote up an idea here: #7456 (comment) -- I think the design sketched out there (and inspired by this PR) would always be as good or better as the current DataFusion approach of decode + filter and thus we could turn it on by default I wonder what you think? I would love to help / collaborate with you |
I suggest we create a new PR for the next POC (unified filter + decoder). I am happy to do so but if you make one it might be easier to collaborate |
Great! Thanks @alamb, i'd like to try this! |
Let's do it! |
As an aside, another thing that keeps coming up in these designs is this primitive: It will potentially show up here again too: when we need to build up a final array after applying filters at the moment the code will be forced to do |
May be we can do this also in the arrow low level. So the datafusion can benefit from? |
100% I am evaluating if I should try and work on this or not. I already feel spread quite thin However, if you are going to take the lead of parquet perdicate evaluation it might be a good option for me to work on while waiting to review your PRs 🤔 |
Hi @alamb, I tried today and try to submit a new PR for #7456 (comment) , but it seems hard for me to wrapper the new way, it's more complex than i expected, feel free to take it. I also can help review, optimize and testing, thanks! I was trying something like this, and it's hard for me to integrate all the workflow and corner cases: fn evaluate_predicate_batch(
batch_size: usize,
mut filter_reader: ParquetRecordBatchReader,
mut predicates: Vec<Box<dyn ArrowPredicate>>,
) -> Result<BooleanArray, ArrowError> {
let mut passing = Vec::with_capacity(batch_size);
let mut total_selected = 0;
let mut batches = Vec::new();
while total_selected < batch_size {
match filter_reader.next() {
Some(Ok(batch)) => {
// Apply predicates sequentially and combine with AND
let mut combined_mask: Option<BooleanArray> = None;
for predicate in predicates.iter_mut() {
let mask = predicate.evaluate(batch.clone())?;
if mask.len() != batch.num_rows() {
return Err(ArrowError::ComputeError(format!(
"Predicate returned {} rows, expected {}",
mask.len(),
batch.num_rows()
)));
}
combined_mask = match combined_mask {
Some(prev) => Some(and(&prev, &mask)?),
None => Some(mask),
};
}
if let Some(mask) = combined_mask {
batches.push(filter_record_batch(
&batch,
&mask));
total_selected += mask.true_count();
passing.push(mask);
} else {
let len = batch.num_rows();
let buffer = BooleanBuffer::new_set(len);
let mask = BooleanArray::new(buffer, None);
total_selected += len;
passing.push(mask);
}
}
Some(Err(e)) => return Err(e),
None => break,
}
}
let arrays: Vec<ArrayRef> = passing
.into_iter()
.map(|b| Arc::new(b) as ArrayRef)
.collect();
let combined = concat(&arrays).unwrap();
let boolean_combined = combined
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap()
.clone();
Ok(boolean_combined)
} |
Thanks @zhuqi-lucas -- I have some ideas I will try out later today |
I have been studying the code today -- I have a good idea on what I want to try |
Thank you @alamb , i submitted a very draft PR today: Need to polish:
|
} | ||
Some(RowSelection::BitMap(bitmap)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I hope to reuse some/all of this code (so it can iterate based on BitMap or RowSelection)
My idea is to switch this code to use a different structure than RowSelection
(something like ResolvedRowSelection
)
This control flow I think is very similar to what @tustvold describes in #5523
The remaining open question in my mind is what heuristics to use to decide when to use RowSelection/ranges and when to use BitMaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb , i think the first initial adaptive case is that if each select/skip is very small and dense, for example < 10, we should use bitmap from testing result. I can do more test based on your read plan PR wit cache merged.
if total < 10 * select_count {
// Bitmap branch
let bitmap = self.create_bitmap_from_ranges(&runs);
match self.array_reader.read_records(bitmap.len()) {
Ok(_) => {}
Err(e) => return Some(Err(e.into())),
};
mask_builder.append_buffer(bitmap.values());
rows_accum += bitmap.true_count();
}
Which issue does this PR close?
and Parquet decoder / decoded page Cache #7363
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?