Skip to content

Commit 4dd1d69

Browse files
committed
complete
1 parent bf504de commit 4dd1d69

File tree

1 file changed

+56
-43
lines changed

1 file changed

+56
-43
lines changed

parquet/src/arrow/array_reader/cached.rs

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub(crate) struct CachedPredicateResultBuilder {
4343
/// <https://github.com/apache/arrow-rs/issues/6692>
4444
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
4545
filters: Vec<BooleanArray>,
46-
batch_size: usize,
4746
}
4847

4948
impl CachedPredicateResultBuilder {
@@ -79,7 +78,6 @@ impl CachedPredicateResultBuilder {
7978
Self {
8079
in_progress_arrays,
8180
filters: vec![],
82-
batch_size,
8381
}
8482
};
8583
}
@@ -106,7 +104,6 @@ impl CachedPredicateResultBuilder {
106104
Self {
107105
in_progress_arrays,
108106
filters: vec![],
109-
batch_size,
110107
}
111108
}
112109

@@ -135,7 +132,6 @@ impl CachedPredicateResultBuilder {
135132
let Self {
136133
in_progress_arrays,
137134
filters,
138-
batch_size: _,
139135
} = self;
140136

141137
let new_selection = RowSelection::from_filters(&filters);
@@ -316,29 +312,36 @@ trait InProgressArray {
316312
/// results is not possible.
317313
fn create_in_progress_array(
318314
in_projection: bool,
319-
_data_type: &DataType,
315+
data_type: &DataType,
320316
batch_size: usize,
321317
) -> Box<dyn InProgressArray> {
322-
if in_projection {
323-
Box::new(InProgressArrayImpl::new(batch_size, GenericArrayBuilder::new()))
324-
325-
} else {
318+
if !in_projection {
326319
// column is not in the projection, so no need to cache
327-
Box::new(NoOpInProgressArray::new())
328-
320+
return Box::new(NoOpInProgressArray::new())
321+
}
322+
323+
match data_type {
324+
DataType::Utf8View => {
325+
Box::new(InProgressArrayImpl::new(batch_size,
326+
InProgressStringViewBuilder::new_with_capacity(batch_size)))
327+
}
328+
_ => {
329+
// TODO implement more specific types
330+
Box::new(InProgressArrayImpl::new(batch_size, GenericArrayBuilder::new()))
331+
}
329332
}
330333
}
331334

332335

333336
/// A builder for creating an InProgressArray. Trait so we can use Dyn dispatch
334337
trait InProgressArrayBuilder {
335338
/// Appends all values of the array to the in progress array
336-
///
339+
///
337340
/// TODO: potentially pass in filter and unfiltered array to avoid a copy
338341
fn append(&mut self, array: ArrayRef);
339342

340343
/// Finalizes the in progress array, resetting state and returning the new array.
341-
///
344+
///
342345
/// Returns None if there are no rows in progress
343346
fn try_build(&mut self) -> Result<Option<ArrayRef>>;
344347
}
@@ -363,7 +366,7 @@ impl <B: InProgressArrayBuilder> InProgressArrayImpl<B> {
363366
inner,
364367
}
365368
}
366-
369+
367370
/// Combines all arrays in `current` into a new array in `finished` and returns the
368371
/// number of rows in the array added to `self.finished`
369372
fn finish_current(&mut self) -> Result<usize> {
@@ -372,11 +375,11 @@ impl <B: InProgressArrayBuilder> InProgressArrayImpl<B> {
372375
return Ok(0);
373376
}
374377
let Some(new_array) = self.inner.try_build()? else {
375-
// no rows in current
378+
// no rows in current
376379
self.current_rows = 0;
377380
return Ok(0);
378381
};
379-
382+
380383
let num_rows = new_array.len();
381384
self.finished.push(new_array);
382385
self.current_rows = 0;
@@ -442,7 +445,7 @@ impl NoOpInProgressArray {
442445
}
443446

444447
/// Implements a GenericArrayBuilder used for any array type by using buffering and `concat`
445-
///
448+
///
446449
/// TODO avoid this by using type specific array builders
447450
struct GenericArrayBuilder {
448451
arrays: Vec<ArrayRef>
@@ -480,38 +483,42 @@ impl InProgressArrayBuilder for GenericArrayBuilder {
480483
}
481484

482485

483-
484-
485486
/// An implementation of InProgressArray for StringViewArray
486487
/// that knows how to efficiently and incrementally concatenate arrays
487-
struct StringViewInProgressArray {
488+
///
489+
/// TODO move this to StringViewBuilder (basically probably add `append_array` to it)
490+
struct InProgressStringViewBuilder {
488491
new_views: Vec<u128>,
489492
null_buffer_builder: NullBufferBuilder,
490493
buffers: Vec<Buffer>,
494+
initial_capacity: usize,
491495
}
492496

493-
impl StringViewInProgressArray {
494-
fn new_with_capacity(num_rows: usize) -> Self {
495-
todo!()
496-
}
497+
impl InProgressStringViewBuilder {
498+
fn new_with_capacity(initial_capacity: usize) -> Self {
499+
Self {
500+
new_views: Vec::with_capacity(initial_capacity),
501+
null_buffer_builder: NullBufferBuilder::new(initial_capacity),
502+
buffers: Vec::with_capacity(100), // TODO better estimate of number of buffers
503+
initial_capacity,
504+
}
505+
}
497506
}
498507

499-
fn concat_string_view_arrays(arrays: &[ArrayRef]) -> Result<ArrayRef> {
500-
// Special case for StringViewArray inspired by DataFusion:
501-
// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L222-L221
502-
503-
let total_rows = arrays.iter().map(|a| a.len()).sum();
504-
let mut new_views = Vec::with_capacity(total_rows);
505-
let mut null_buffer_builder = NullBufferBuilder::new(total_rows);
506-
let mut buffers = Vec::with_capacity(100); // better estimate of buffer sizes
507-
508-
// copy each input array into the to the output, one at a time
509-
for array in arrays.iter() {
508+
impl InProgressArrayBuilder for InProgressStringViewBuilder {
509+
fn append(&mut self, array: ArrayRef) {
510+
// Special case for StringViewArray inspired by DataFusion:
511+
// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L222-L221
510512
let num_rows = array.len();
511513
if num_rows == 0 {
512-
continue;
514+
return; // nothing to do
513515
}
514516
let array = array.as_string_view();
517+
518+
let null_buffer_builder = &mut self.null_buffer_builder;
519+
let buffers = &mut self.buffers;
520+
let new_views = &mut self.new_views;
521+
515522
// Copy nulls
516523
if let Some(nulls) = array.nulls() {
517524
null_buffer_builder.append_buffer(nulls);
@@ -550,7 +557,7 @@ fn concat_string_view_arrays(arrays: &[ArrayRef]) -> Result<ArrayRef> {
550557
// buffer as well as updating the views
551558
let mut new_buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
552559
let new_buffer_index = buffers.len() as u32; // making one new buffer
553-
// Update any views that point to the old buffers.
560+
// Update any views that point to the old buffers.
554561
for v in new_views[starting_view..].iter_mut() {
555562
let view_len = *v as u32;
556563
// if view_len is 12 or less, data is inlined and doesn't need an update
@@ -572,12 +579,18 @@ fn concat_string_view_arrays(arrays: &[ArrayRef]) -> Result<ArrayRef> {
572579
}
573580
}
574581

575-
// Form output array
576-
let nulls = null_buffer_builder.finish();
577-
// safety: we know what we are doing above
578-
let new_array =
579-
unsafe { StringViewArray::new_unchecked(ScalarBuffer::from(new_views), buffers, nulls) };
580-
Ok(Arc::new(new_array))
582+
fn try_build(&mut self) -> Result<Option<ArrayRef>> {
583+
// Form output array
584+
let nulls = self.null_buffer_builder.finish();
585+
let new_views = std::mem::replace(&mut self.new_views, Vec::with_capacity(self.initial_capacity));
586+
let buffers= std::mem::replace(&mut self.buffers, Vec::with_capacity(100)); // TODO better buffer estimate
587+
588+
// safety: we know what we are doing above
589+
let new_array =
590+
unsafe { StringViewArray::new_unchecked(ScalarBuffer::from(new_views), buffers, nulls) };
591+
592+
Ok(Some(Arc::new(new_array)))
593+
}
581594
}
582595

583596
/// return the size required for buffers to hold all strings

0 commit comments

Comments
 (0)