Skip to content

[EPIC] Faster performance for parquet predicate evaluation for non selective filters #7456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 of 7 tasks
alamb opened this issue Apr 29, 2025 · 9 comments
Open
2 of 7 tasks
Assignees
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate

Comments

@alamb
Copy link
Contributor

alamb commented Apr 29, 2025

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

When evaluating filters on data stored in parquet, you can:

  1. Use the with_row_filter API to apply predicates during the scan
  2. Read the data and apply the predicate using the filter kernel afterwards

Currently, it is faster to use with_row_filter for some predicates and filter for others. In DataFusion we have a configuration setting to choose between the strategies (filter_pushdown, see apache/datafusion#3463) but that is a bad UX as it
means the user must somehow know which strategy to choose, but the strategy changes

In general the queries that are slower when with_row_filter is used:

  1. The predicates are not very selective (e.g. they pass more than 1% of the rows)
  2. The filters are applied to columns which are also used in the query result (e.g. the a filter column is also in the projection)

More Background:

The predicates are provides as a RowFilter (see docs for more details)

RowFilter applies predicates in order, after decoding only the columns required. As predicates eliminate rows, fewer rows from subsequent columns may be required, thus potentially reducing IO and decode.

Describe the solution you'd like

I would like the evaluation of predicates in RowFilter (aka pushed down predicates) to never be worse than decoding the columns first and then filtering them with the filter kernel

We have added a benchmark #7401, which hopefully can

cargo bench --all-features --bench arrow_reader_row_filter

Describe alternatives you've considered
This goal will likely require several changes to the codebase. Here are some options:

@alamb alamb added the enhancement Any new improvement worthy of a entry in the changelog label Apr 29, 2025
@alamb alamb added the parquet Changes to the parquet crate label Apr 29, 2025
@alamb
Copy link
Contributor Author

alamb commented Apr 30, 2025

I just spoke with @XiangpengHao -- from my perspective the current status is:

  1. Parquet decoder / decoded page Cache #7363: blocked on getting some benchmark results that show the decoded page cache improves performance; Then we can proceed / merge the page cache change
  2. In paralell / then we can move on to working on a better representation for RowFilter (Adaptive Parquet Predicate Pushdown Evaluation #5523 / Consider removing skip from RowSelector #7450 / RowSelection::and_then is slow #7458)

@alamb
Copy link
Contributor Author

alamb commented May 8, 2025

Fascinatingly, Clickbench released a blog post recently about their parquet pushdown work

https://clickhouse.com/blog/clickhouse-and-parquet-a-foundation-for-fast-lakehouse-analytics

Possibly even more interesting is that they link to a master's thesis from Peter Boncz's group about how to quickly evaluate predicates during Parquet Decoding: https://homepages.cwi.nl/~boncz/msc/2018-BoudewijnBraams.pdf

This thesis directly addresses some of the work we are considering (though they only consider Selection masks (bitmask) and Selection Vector (selected indices)

@alamb
Copy link
Contributor Author

alamb commented May 12, 2025

@zhuqi-lucas has a great insight in #7454 -- namely that instead of a two pass algorithm (evaluate RowFilter to form a final RowSelection and then re-decode the filter) we can combine the filter application and decode steps (see #7454 (review))

The current flow goes something like:

  1. A set of array readers is created for the filter columns, and uses the provided RowSelection (this captures prunning
    pages ).
  2. The decoded batches are used to evaluate the RowFilter / ArrowPredicates, which produces a BooleanArray bitmap
  3. The "final" RowSelection is created, by unioning the existing RowSelection with the BooleanArrays
  4. A new set of array readers is created with the updated RowSelection

The current PR starts heading down a slightly modified flow, where the RowSelection and RowFilters are not combined.

I think a combined solution would look something like:

  1. Create Decoders for filter columns and projection (only) columns

Decoding proceeds like:

  1. read rows from initial RowSelection (reads a 8192 rows) from filter columns, if any
  2. Apply any RowFilters on it (produces a BooleanArray)
  3. repeat 1-2 until there are at least 8192 (batch size) rows that pass the filter. (This means we have Vec<BooleanArray> with 8192 1s and a Vec for each filter column that is also a projection column)
  4. Then decode as maby RecordBatches from the projection (only) columns using the initial RowSelection)
  5. Apply the filters to each array to form the final output batch (in projection columns)

@alamb
Copy link
Contributor Author

alamb commented May 16, 2025

@zhuqi-lucas and I have been working on various strategies / structures to make the filtering faster. I believe we now have evidence enough to proceed with a more sophisticated implementation

Specifically,

Thus my next steps will be:

  1. Create a few refactoring PRs that gets the predicate code into shape

Perhaps then @zhuqi-lucas can help port the hybrid Filter/RowSelection to the ReadPlan to get better performance without changing any public interfaces

@zhuqi-lucas
Copy link
Contributor

Thank you @alamb , i will help port the hybrid Filter/RowSelection to the ReadPlan to get better performance without changing any public interfaces and also testing, review.

@alamb
Copy link
Contributor Author

alamb commented May 16, 2025

Thank you @alamb , i will help port the hybrid Filter/RowSelection to the ReadPlan to get better performance without changing any public interfaces and also testing, review.

Thank you so much @zhuqi-lucas -- it is great working with you

@zhuqi-lucas
Copy link
Contributor

Thank you @alamb , i tried the draft POC for adaptive predicate pushdown based read plan PR today, it shows good result:

#7524 (comment)

@alamb
Copy link
Contributor Author

alamb commented May 20, 2025

Update: I am quite happy with how reusing filtered results is coming along, see:

I also had some thoughts on reducing the buffering required here: #6692 (comment)

@alamb
Copy link
Contributor Author

alamb commented May 21, 2025

Status update

The high level plan to improve performance has two parts:

  1. adaptive iteration / representation of filter results (basically Adaptive Parquet Predicate Pushdown Evaluation #5523)
  2. Caching the results of filtering when the column is used in the final projection (basically improve: reuse Arc<dyn Array> in parquet record batch reader. #4864)

My main concern about resuing the result of filtering is memory usage and I think it is important to keep the usage to a minimum -- the current APIs (filter and concat kernels) require a 2x memory overhead so I think it is important to reduce that as well as add some way to limit memory consumption when the filtering result

We also need to implement more sophisticated logic when there are multiple predicates.

My next steps are:

  1. Try and factor the adaptive representation of filter results with @zhuqi-lucas
  2. Explore ways to reduce the memory overhead with caching results (this should help other APIs too).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

No branches or pull requests

2 participants