Skip to content

POC Adaptive predicate push down based read plan #7524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 151 additions & 48 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@
// under the License.

//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
Expand All @@ -36,6 +27,16 @@ use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_buffer::BooleanBufferBuilder;
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::filter;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::collections::VecDeque;
use std::sync::Arc;

pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};

Expand Down Expand Up @@ -799,67 +800,169 @@ impl Iterator for ParquetRecordBatchReader {
}
}

/// Take the next selection from the selection queue, and return the selection
/// whose selected row count is to_select or less (if input selection is exhausted).
fn take_next_selection(
selection: &mut VecDeque<RowSelector>,
to_select: usize,
) -> Option<RowSelection> {
let mut current_selected = 0;
let mut rt = Vec::new();
while let Some(front) = selection.pop_front() {
if front.skip {
rt.push(front);
continue;
}

if current_selected + front.row_count <= to_select {
rt.push(front);
current_selected += front.row_count;
} else {
let select = to_select - current_selected;
let remaining = front.row_count - select;
rt.push(RowSelector::select(select));
selection.push_front(RowSelector::select(remaining));
return Some(rt.into());
}
}
if !rt.is_empty() {
return Some(rt.into());
}
None
}

impl ParquetRecordBatchReader {
/// Returns the next `RecordBatch` from the reader, or `None` if the reader
/// has reached the end of the file.
///
/// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
/// simplify error handling with `?`
///
/// Use the adaptive selection strategy to read the next batch of rows, here are the
/// details about the policy:
///
/// **Window Size**: The adaptive window size equals the configured `batch_size`.
/// For each call, the reader processes up to `batch_size` rows in one logical window.
/// It dynamically decides on a per-subwindow basis whether to use:
/// - **Range-based selection** (default, for higher throughput)
/// - **Bitmap-based selection** (finer granularity when runs are very short)
///
/// **Switching Criterion**: Only when the *average run length* in the subwindow is < 10 rows
/// (i.e. `total_rows < 10 * num_runs`) do we switch from range to bitmap mode.
///
/// **Example Patterns (sub-window examples)**:
/// *Note: these totals refer to a sampled sub-window of the full batch, not the entire `batch_size`.*
///
/// ```text
/// Batch size = 8192 rows (Window)
///
/// 1) Big range runs:
/// [2000 read | 2000 skip | 4192 read]
/// avg ≈ 3 runs, avg length ≈ 2730 → **range** mode
///
/// 2) Medium range runs:
/// [200 read | 200 skip | 200 read ...]
/// avg length ≈ 200 → **range** mode
///
/// 3) Dense small runs (many small alternations):
/// [ 5 read | 10 skip | 5 read | 10 skip | 5 read | 5 read ... ]
/// avg ≈ 6.7 < 10 → **bitmap** mode
/// ```
///
/// Returns a `RecordBatch` if any rows are produced, or `None` when no rows remain.
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut read_records = 0;
let batch_size = self.batch_size();
let mut mask_builder = BooleanBufferBuilder::new(batch_size);

match self.read_plan.selection_mut() {
Some(selection) => {
while read_records < batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped = self.array_reader.skip_records(front.row_count)?;

if skipped != front.row_count {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
));
while let Some(cur_selection) =
take_next_selection(selection, batch_size - read_records)
{
let mut total_read = 0;
let mut total_skip = 0;
for r in cur_selection.iter() {
if r.skip {
total_skip += r.row_count;
} else {
total_read += r.row_count;
}
continue;
}

//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if front.row_count == 0 {
continue;
}

// try to read record
let need_read = batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelector::select(remaining));
need_read
let select_count = cur_selection.iter().count();
let total = total_skip + total_read;

if total < 10 * select_count {
// Choose bitmap and then to filter if runs are on average < 10 rows
let mut bitmap_builder = BooleanBufferBuilder::new(total);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many of these read selections come from a BooleanBuffer originally (the result of evaluating a ArrowPredicate), I wonder what you think about potentially avoiding creating RowSelections in the ReadPlan

Something like

enum DecodeRows {
  /// Decode exactly the rows accoding to the row selection
  RowSelection(RowSelection),
  /// Decode all rows, and then apply a filter to keep only the ones that matter
  Bitmask(BooleanBuffer)
}
  
impl ReadPlan {
  // instead of Selection, have DecodeRows
  decode_plan: VecDeque<DecodeRows>;
}

And then the trick would be some sort of heuristic / adaptive approach to turn the result of the ArrowPredicate into a DecodeRows plan

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})

🤔

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many of these read selections come from a BooleanBuffer originally (the result of evaluating a ArrowPredicate), I wonder what you think about potentially avoiding creating RowSelections in the ReadPlan

Something like

enum DecodeRows {
  /// Decode exactly the rows accoding to the row selection
  RowSelection(RowSelection),
  /// Decode all rows, and then apply a filter to keep only the ones that matter
  Bitmask(BooleanBuffer)
}
  
impl ReadPlan {
  // instead of Selection, have DecodeRows
  decode_plan: VecDeque<DecodeRows>;
}

And then the trick would be some sort of heuristic / adaptive approach to turn the result of the ArrowPredicate into a DecodeRows plan

let raw = RowSelection::from_filters(&filters);
Ok(match input_selection {
Some(selection) => selection.and_then(&raw),
None => raw,
})

🤔

Thank you @alamb for this suggestion, i was thinking use this way.

But i don't use it for this PR, because the adaptive level is batch level for this PR, for example:

We have batch size 8192, we only do the adaptive for each batch level, and the default is to use range selector, so we will be more accurate for each small level to choose:

8192(2000read 2000skip 4192 read) => keep range
8192(200read 200 skip ....200read..) avg =200 => keep range
8192(5 read 10 skip 10 skip, 5 read, 5read...) => avg < 10 so nee to change the 8192 small window to bitmap
....

So we only change for small window with default range selector to change to bitmap, it's not heavy for most cases.

The adaptive window size is the batch size, not for the total row group len.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8192(5 read 10 skip 10 skip, 5 read, 5read...) => avg < 10 so nee to change the 8192 small window to bitmap

Yeah, what I am thinking is somehow avoid creating the pattern of 5 read 10 skip 5 read 10 skip ... in the first place.

I haven't thought through exactly how to do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for checking, i also add doc for latest PR.

for select in cur_selection.iter() {
bitmap_builder.append_n(select.row_count, !select.skip);
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read)? {
0 => break,
rec => read_records += rec,
};
let bitmap = BooleanArray::new(bitmap_builder.finish(), None);
self.array_reader.read_records(bitmap.len())?;
mask_builder.append_buffer(bitmap.values());
read_records += bitmap.true_count();
} else {
// Use fast range-Based skip/read
for select in cur_selection.iter() {
if select.skip {
let skipped = self.array_reader.skip_records(select.row_count)?;

if skipped != select.row_count {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
select.row_count,
skipped
));
}
continue;
} else {
//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if select.row_count == 0 {
continue;
}
match self.array_reader.read_records(select.row_count)? {
0 => break,
rec => {
mask_builder.append_n(rec, true);
read_records += rec;
}
};
}
}
}
// Stop once ~75% of window size is filled
if read_records >= (batch_size / 4 * 3) {
break;
}
}
}
None => {
self.array_reader.read_records(batch_size)?;
// No selector: read entire batch
let rec = self.array_reader.read_records(self.batch_size())?;
mask_builder.append_n(rec, true);
}
};

let array = self.array_reader.consume_batch()?;
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
})?;
if array.is_empty() {
return Ok(None);
}

// Apply mask if used
let final_array = if mask_builder.is_empty() {
array
} else {
let bitmap = BooleanArray::new(mask_builder.finish(), None);
filter(&array, &bitmap)?
};

let struct_arr = final_array.as_struct_opt().expect("StructArray expected");

Ok(if struct_array.len() > 0 {
Some(RecordBatch::from(struct_array))
// Return only non-empty batches
Ok(if struct_arr.len() > 0 {
Some(RecordBatch::from(struct_arr))
} else {
None
})
Expand Down
Loading