Skip to content

Alamb/sketch boolean array #7540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 38 additions & 40 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
use crate::arrow::arrow_reader::read_step::ReadStep;

mod filter;
mod read_plan;
mod selection;
pub mod statistics;
pub mod read_step;

/// Builder for constructing Parquet readers that decode into [Apache Arrow]
/// arrays.
Expand Down Expand Up @@ -808,54 +810,50 @@ impl ParquetRecordBatchReader {
/// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to
/// simplify error handling with `?`
fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
let mut end_of_stream = false;
let mut read_records = 0;
let batch_size = self.batch_size();
match self.read_plan.selection_mut() {
Some(selection) => {
while read_records < batch_size && !selection.is_empty() {
let front = selection.pop_front().unwrap();
if front.skip {
let skipped = self.array_reader.skip_records(front.row_count)?;

if skipped != front.row_count {
return Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
));
}
continue;
}
while read_records < batch_size {
let Some(step) = self.read_plan.next() else {
end_of_stream = true;
break;
};

//Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader.
//Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669
if front.row_count == 0 {
continue;

match step {
ReadStep::Skip(row_count) => {
let skipped = self.array_reader.skip_records(row_count)?;

if skipped != row_count {
return Err(general_err!(
"Internal Error: failed to skip rows, expected {row_count}, got {skipped}",
));
}
}
ReadStep::Read(row_count) => {
let read = self.array_reader.read_records(row_count)?;
if read == 0 {
end_of_stream = true;
break;
}

// try to read record
let need_read = batch_size - read_records;
let to_read = match front.row_count.checked_sub(need_read) {
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelector::select(remaining));
need_read
}
_ => front.row_count,
};
match self.array_reader.read_records(to_read)? {
0 => break,
rec => read_records += rec,
};
read_records += read
}
}
None => {
self.array_reader.read_records(batch_size)?;
}
};
ReadStep::Mask{..} => {
todo!();
}
};
}

let array = self.array_reader.consume_batch()?;

// Reader should read exactly `batch_size` records except for last batch
if !end_of_stream && (read_records != batch_size) {
return Err(general_err!(
"Internal Error: unexpected read count. Expected {batch_size} got {read_records}"
));
}

let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError("Struct array reader should return struct array".to_string())
})?;
Expand Down
Loading
Loading