Skip to content

Commit ed0949e

Browse files
authored
Refactor ByteGroupValueBuilder to use MaybeNullBufferBuilder (#12681)
1 parent 663ab76 commit ed0949e

File tree

1 file changed

+30
-65
lines changed

1 file changed

+30
-65
lines changed

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,21 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::array::BooleanBufferBuilder;
1918
use arrow::array::BufferBuilder;
2019
use arrow::array::GenericBinaryArray;
2120
use arrow::array::GenericStringArray;
2221
use arrow::array::OffsetSizeTrait;
2322
use arrow::array::PrimitiveArray;
2423
use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
25-
use arrow::buffer::NullBuffer;
2624
use arrow::buffer::OffsetBuffer;
2725
use arrow::buffer::ScalarBuffer;
28-
use arrow::datatypes::ArrowNativeType;
2926
use arrow::datatypes::ByteArrayType;
3027
use arrow::datatypes::DataType;
3128
use arrow::datatypes::GenericBinaryType;
32-
use arrow::datatypes::GenericStringType;
3329
use datafusion_common::utils::proxy::VecAllocExt;
3430

3531
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
32+
use arrow_array::types::GenericStringType;
3633
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
3734
use std::sync::Arc;
3835
use std::vec;
@@ -190,6 +187,12 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
190187
}
191188

192189
/// An implementation of [`GroupColumn`] for binary and utf8 types.
190+
///
191+
/// Stores a collection of binary or utf8 group values in a single buffer
192+
/// in a way that allows:
193+
///
194+
/// 1. Efficient comparison of incoming rows to existing rows
195+
/// 2. Efficient construction of the final output array
193196
pub struct ByteGroupValueBuilder<O>
194197
where
195198
O: OffsetSizeTrait,
@@ -201,8 +204,8 @@ where
201204
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
202205
/// are stored as a zero length string.
203206
offsets: Vec<O>,
204-
/// Null indexes in offsets, if `i` is in nulls, `offsets[i]` should be equals to `offsets[i+1]`
205-
nulls: Vec<usize>,
207+
/// Nulls
208+
nulls: MaybeNullBufferBuilder,
206209
}
207210

208211
impl<O> ByteGroupValueBuilder<O>
@@ -214,7 +217,7 @@ where
214217
output_type,
215218
buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
216219
offsets: vec![O::default()],
217-
nulls: vec![],
220+
nulls: MaybeNullBufferBuilder::new(),
218221
}
219222
}
220223

@@ -224,40 +227,33 @@ where
224227
{
225228
let arr = array.as_bytes::<B>();
226229
if arr.is_null(row) {
227-
self.nulls.push(self.len());
230+
self.nulls.append(true);
228231
// nulls need a zero length in the offset buffer
229232
let offset = self.buffer.len();
230-
231233
self.offsets.push(O::usize_as(offset));
232-
return;
234+
} else {
235+
self.nulls.append(false);
236+
let value: &[u8] = arr.value(row).as_ref();
237+
self.buffer.append_slice(value);
238+
self.offsets.push(O::usize_as(self.buffer.len()));
233239
}
234-
235-
let value: &[u8] = arr.value(row).as_ref();
236-
self.buffer.append_slice(value);
237-
self.offsets.push(O::usize_as(self.buffer.len()));
238240
}
239241

240242
fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
241243
where
242244
B: ByteArrayType,
243245
{
244-
// Handle nulls
245-
let is_lhs_null = self.nulls.iter().any(|null_idx| *null_idx == lhs_row);
246246
let arr = array.as_bytes::<B>();
247-
if is_lhs_null {
248-
return arr.is_null(rhs_row);
249-
} else if arr.is_null(rhs_row) {
250-
return false;
251-
}
247+
self.nulls.is_null(lhs_row) == arr.is_null(rhs_row)
248+
&& self.value(lhs_row) == (arr.value(rhs_row).as_ref() as &[u8])
249+
}
252250

253-
let arr = array.as_bytes::<B>();
254-
let rhs_elem: &[u8] = arr.value(rhs_row).as_ref();
255-
let rhs_elem_len = arr.value_length(rhs_row).as_usize();
256-
debug_assert_eq!(rhs_elem_len, rhs_elem.len());
257-
let l = self.offsets[lhs_row].as_usize();
258-
let r = self.offsets[lhs_row + 1].as_usize();
259-
let existing_elem = unsafe { self.buffer.as_slice().get_unchecked(l..r) };
260-
rhs_elem == existing_elem
251+
/// return the current value of the specified row irrespective of null
252+
pub fn value(&self, row: usize) -> &[u8] {
253+
let l = self.offsets[row].as_usize();
254+
let r = self.offsets[row + 1].as_usize();
255+
// Safety: the offsets are constructed correctly and never decrease
256+
unsafe { self.buffer.as_slice().get_unchecked(l..r) }
261257
}
262258
}
263259

@@ -325,18 +321,7 @@ where
325321
nulls,
326322
} = *self;
327323

328-
let null_buffer = if nulls.is_empty() {
329-
None
330-
} else {
331-
// Only make a `NullBuffer` if there was a null value
332-
let num_values = offsets.len() - 1;
333-
let mut bool_builder = BooleanBufferBuilder::new(num_values);
334-
bool_builder.append_n(num_values, true);
335-
nulls.into_iter().for_each(|null_index| {
336-
bool_builder.set_bit(null_index, false);
337-
});
338-
Some(NullBuffer::from(bool_builder.finish()))
339-
};
324+
let null_buffer = nulls.build();
340325

341326
// SAFETY: the offsets were constructed correctly in `insert_if_new` --
342327
// monotonically increasing, overflows were checked.
@@ -353,9 +338,9 @@ where
353338
// SAFETY:
354339
// 1. the offsets were constructed safely
355340
//
356-
// 2. we asserted the input arrays were all the correct type and
357-
// thus since all the values that went in were valid (e.g. utf8)
358-
// so are all the values that come out
341+
// 2. the input arrays were all the correct type and thus since
342+
// all the values that went in were valid (e.g. utf8) so are all
343+
// the values that come out
359344
Arc::new(unsafe {
360345
GenericStringArray::new_unchecked(offsets, values, null_buffer)
361346
})
@@ -366,27 +351,7 @@ where
366351

367352
fn take_n(&mut self, n: usize) -> ArrayRef {
368353
debug_assert!(self.len() >= n);
369-
370-
let null_buffer = if self.nulls.is_empty() {
371-
None
372-
} else {
373-
// Only make a `NullBuffer` if there was a null value
374-
let mut bool_builder = BooleanBufferBuilder::new(n);
375-
bool_builder.append_n(n, true);
376-
377-
let mut new_nulls = vec![];
378-
self.nulls.iter().for_each(|null_index| {
379-
if *null_index < n {
380-
bool_builder.set_bit(*null_index, false);
381-
} else {
382-
new_nulls.push(null_index - n);
383-
}
384-
});
385-
386-
self.nulls = new_nulls;
387-
Some(NullBuffer::from(bool_builder.finish()))
388-
};
389-
354+
let null_buffer = self.nulls.take_n(n);
390355
let first_remaining_offset = O::as_usize(self.offsets[n]);
391356

392357
// Given offests like [0, 2, 4, 5] and n = 1, we expect to get

0 commit comments

Comments
 (0)