-
Notifications
You must be signed in to change notification settings - Fork 932
Move Selection logic into ReadPlan builder #7537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
/// 1. To have no empty selections (that select no rows) | ||
/// 2. fall on a batch_size boundary (e.g. 0, 100, 200, 300) | ||
/// | ||
/// TODO change this structure to an enum with emit + mask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb , this is great idea, it means we can build the range/bitmap at the build time, and also the adaptive policy can applied here.
/// How to select the next batch of rows to read from the Parquet file | ||
/// | ||
/// This allows the reader to dynamically choose between decoding strategies | ||
pub(crate) enum RowsPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful enum, it will include all the cases!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
It isn't quite done yet, but it is getting close.
I am thinking I will likely make 2 PRs
- one PR that rearranges when the selectors are created, but still uses
RowSelector
- a second PR that will switch to use this enum
However, I will leave out the BooleanBuffer part of the enum initially I think, to keep the review load manageabe
Then I am thinking we can can adapt the code from these PRs
- Poc for adaptive parquet predicate pushdown(bitmap/range) with page cache(3 data pages) #7454
- Draft POC Unified filter decoder #7503
- POC Adaptive predicate push down based read plan #7524
To implement filtering via mask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense!
6bcb7a6
to
ee714ca
Compare
} | ||
continue; | ||
} | ||
while read_records < batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of this PR is to (further) simplify this inner loop of the parquet decoder.
All the logic for splitting into batch sizes, etc is now done in the ReadPlanBuilder
so when this code is invoked it simply does whatever is called for.
The reason for this change is so we can add more complexity into the decision of what to do in subsequent PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's clear!
if front.row_count == 0 { | ||
continue; | ||
} | ||
if front.skip { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than error checking, the inner loop now simply reads a RowSelection
and does what it says
} | ||
|
||
#[cfg(test)] | ||
mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically the plan generation code is already fully covered by other tests in this crate, but I added new unit tests here to:
- Document the behavior better
- Make it easier to write tests for new behavior (like filter mask implementation)
type Item = RowSelector; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
while let Some(mut front) = self.input_selectors.pop_front() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic used to be in the RecordBatchReader::next
call
It is refactored out into its own module so it can be more easily tested and (eventually) extended.
@@ -358,14 +361,6 @@ impl RowSelection { | |||
self.selectors.iter().any(|x| !x.skip) | |||
} | |||
|
|||
/// Trims this [`RowSelection`] removing any trailing skips | |||
pub(crate) fn trim(mut self) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is moved into the plan
I am pretty happy with how this currently looks, but before I mark it for review I want to make a proof of concept that I can actually improve performance with it |
🤖 |
🤖: Benchmark completed Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thank you @alamb, great work!
} | ||
continue; | ||
} | ||
while read_records < batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's clear!
/// The returned stream of [`RowSelector`]s is guaranteed to have: | ||
/// 1. No empty selections (that select no rows) | ||
/// 2. No selections that span batch_size boundaries | ||
/// 3. No trailing skip selections | ||
/// | ||
/// For example, if the `batch_size` is 100 and we are selecting all 200 rows | ||
/// from a Parquet file, the selectors will be: | ||
/// - `RowSelector::select(100) <-- forced break at batch_size boundary` | ||
/// - `RowSelector::select(100)` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work !
)); | ||
} | ||
} else { | ||
let read = self.array_reader.read_records(front.row_count)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, do we have fast path when read < front.row_count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure what you mean here -- if read < row_count
I think that means the array is exhausted and the row group is done.
What sort of fast path would it be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure what you mean here -- if
read < row_count
I think that means the array is exhausted and the row group is done.What sort of fast path would it be?
That's right @alamb , sorry for that i am not describing it correctly, i mean do we need to break early for it. It looks like not needed.
🤔 it seems it is less efficient |
🤖 |
Update here is that this is not looking super promising and I am somewhat stuck with how to integrate mask based selections into the logic more cleanly. I need to think about it some more. I may park this for a while and continue working on filter results caching some more |
🤖: Benchmark completed Details
|
Thank you very much @alamb , i can continue help investigate why the performance has some regression for this PR. |
It looks like remove this check, can improve a little performance, but still regression some cases, can't find the root cause until now. // Reader should read exactly `batch_size` records except for last batch
if !end_of_stream && (read_records != batch_size) {
return Err(general_err!(
"Internal Error: unexpected read count. Expected {batch_size} got {read_records}"
));
} |
Thansk @zhuqi-lucas -- I will remove that check |
🤖 |
🤖: Benchmark completed Details
|
/// how many records have been read by RowSelection in the "current" batch | ||
read_records: usize, | ||
/// Input selectors to read from | ||
input_selectors: VecDeque<RowSelector>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Vec
can be used here (track an index)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used VecDeque as that is how the current code does it
I can try and see if it makes any difference,.
Ok, the latest benchmark result I think are now better and show no regression thanks to @zhuqi-lucas 's suggestion. I will try @Dandandan 's idea to use a Vec and see if that helps |
🤖 |
76b1ef6
to
fb87e2c
Compare
🤖: Benchmark completed Details
|
🤔 seems like vec made things slower (see results here) |
fb87e2c
to
948374a
Compare
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb, it's a good news! |
948374a
to
72a8114
Compare
This PR has shown me that for some queries the dispatch logic for RowSelection is quite high (as in just doing an extra compare in that loop made a measurable difference). @zhuqi-lucas in your testing, did you measure where the cutoff for using a bitmap vs a RowSelector was? I think I remember seeing a value of |
I agree @alamb , i was testing with 10 for cutoff for using a bitmap vs a RowSelector, it's a very basic cutoff: avg_size_of_selector = total row / selectors if avg_size_of_selector > 10 using selector if avg_size_of_selector <= 10 using bitmap And the default is selector because i use it to compute avg_size_of_selector. |
Make sense -- thank you I found The other thing I couldn't easily work out was if there was any way to switch from Or maybe we could just add a third type of |
Thank you @alamb , this is very good point:
Because, it may happen 8192 => bitmap, 8192 => selector, 8192 => bitmap... We can't use only one to make it optimize.
For example, we have a output batch, after selecting 5 batch size:
We can merge 1, 2 because they are all bitmap. But i think we can start from the basic optimization, only use batch size window to make the decision to choose bitmap or selector. And later, we can optimize further. Maybe we can only have selector for ReadPlan, but for adaptive window size(currently fixed with batch size), we can change to bitmap if it's dense for the first step... |
This is an interesting idea and I think it is worth explroing
👍 Another thing that makes this tricky in my mind is that if |
Very good point! @alamb It's hard for us to reduce it's overhead, maybe we can setting something like max_bitmap_iterator: When bitmap iterator hit > max_bitmap_iterator, we can consume it first as a output batch, and then to merge those batch finally. But i am not sure if it will make the performance worse than using selector. |
Note that 400+ lines of this PR are new unit tests. The actual code change is much smaller
Draft until
Which issue does this PR close?
This is a step towards implementing Adaptive parquet filter selections:
Rationale for this change
Part of the idea of adaptive decoding is the need to have different read strategies based on the patterns of rows selected
The current code mixes
This makes it hard to add additional complexity to determining the read/skip pattern, for example @zhuqi-lucas had to put Bitmap selection the logic in the middle of the decoder here:
Similarly to the way the
filter
kernel decides up front how to scan, I think we should also change the parquet reader to determine what to do up front and then just do it during decode.Splitting the planning from the execution also gives us a place to generate (and unit test) various heuristics for the plan
Change
There is no change in behavior intended -- the selection evaluation is not yet adaptive. This is meant to be a pure refactoring. I have added tests / test framework to make it easier to make this adaptive in the future
What changes are included in this PR?
Are there any user-facing changes?
Next up I will change from RowSelector into a different enum