Skip to content

Commit ddb4fac

Browse files
authored
Refactor PrimitiveGroupValueBuilder to use MaybeNullBufferBuilder (#12623)
* Refactor PrimitiveGroupValueBuilder to use BooleanBuilder * Refactor boolean buffer builder out * tweaks * tweak * simplify * Add specializations for null / non null
1 parent 29b8af2 commit ddb4fac

File tree

4 files changed

+240
-103
lines changed

4 files changed

+240
-103
lines changed

datafusion/physical-plan/src/aggregates/group_values/column.rs

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::group_column::{
19-
ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
19+
ByteGroupValueBuilder, GroupColumn, NonNullPrimitiveGroupValueBuilder,
20+
PrimitiveGroupValueBuilder,
2021
};
2122
use crate::aggregates::group_values::GroupValues;
2223
use ahash::RandomState;
@@ -123,6 +124,26 @@ impl GroupValuesColumn {
123124
}
124125
}
125126

127+
/// instantiates a [`PrimitiveGroupValueBuilder`] or
128+
/// [`NonNullPrimitiveGroupValueBuilder`] and pushes it into $v
129+
///
130+
/// Arguments:
131+
/// `$v`: the vector to push the new builder into
132+
/// `$nullable`: whether the input can contains nulls
133+
/// `$t`: the primitive type of the builder
134+
///
135+
macro_rules! instantiate_primitive {
136+
($v:expr, $nullable:expr, $t:ty) => {
137+
if $nullable {
138+
let b = PrimitiveGroupValueBuilder::<$t>::new();
139+
$v.push(Box::new(b) as _)
140+
} else {
141+
let b = NonNullPrimitiveGroupValueBuilder::<$t>::new();
142+
$v.push(Box::new(b) as _)
143+
}
144+
};
145+
}
146+
126147
impl GroupValues for GroupValuesColumn {
127148
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
128149
let n_rows = cols[0].len();
@@ -133,54 +154,22 @@ impl GroupValues for GroupValuesColumn {
133154
for f in self.schema.fields().iter() {
134155
let nullable = f.is_nullable();
135156
match f.data_type() {
136-
&DataType::Int8 => {
137-
let b = PrimitiveGroupValueBuilder::<Int8Type>::new(nullable);
138-
v.push(Box::new(b) as _)
139-
}
140-
&DataType::Int16 => {
141-
let b = PrimitiveGroupValueBuilder::<Int16Type>::new(nullable);
142-
v.push(Box::new(b) as _)
143-
}
144-
&DataType::Int32 => {
145-
let b = PrimitiveGroupValueBuilder::<Int32Type>::new(nullable);
146-
v.push(Box::new(b) as _)
147-
}
148-
&DataType::Int64 => {
149-
let b = PrimitiveGroupValueBuilder::<Int64Type>::new(nullable);
150-
v.push(Box::new(b) as _)
151-
}
152-
&DataType::UInt8 => {
153-
let b = PrimitiveGroupValueBuilder::<UInt8Type>::new(nullable);
154-
v.push(Box::new(b) as _)
155-
}
156-
&DataType::UInt16 => {
157-
let b = PrimitiveGroupValueBuilder::<UInt16Type>::new(nullable);
158-
v.push(Box::new(b) as _)
159-
}
160-
&DataType::UInt32 => {
161-
let b = PrimitiveGroupValueBuilder::<UInt32Type>::new(nullable);
162-
v.push(Box::new(b) as _)
163-
}
164-
&DataType::UInt64 => {
165-
let b = PrimitiveGroupValueBuilder::<UInt64Type>::new(nullable);
166-
v.push(Box::new(b) as _)
167-
}
157+
&DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type),
158+
&DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type),
159+
&DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type),
160+
&DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type),
161+
&DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type),
162+
&DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type),
163+
&DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type),
164+
&DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type),
168165
&DataType::Float32 => {
169-
let b = PrimitiveGroupValueBuilder::<Float32Type>::new(nullable);
170-
v.push(Box::new(b) as _)
166+
instantiate_primitive!(v, nullable, Float32Type)
171167
}
172168
&DataType::Float64 => {
173-
let b = PrimitiveGroupValueBuilder::<Float64Type>::new(nullable);
174-
v.push(Box::new(b) as _)
175-
}
176-
&DataType::Date32 => {
177-
let b = PrimitiveGroupValueBuilder::<Date32Type>::new(nullable);
178-
v.push(Box::new(b) as _)
179-
}
180-
&DataType::Date64 => {
181-
let b = PrimitiveGroupValueBuilder::<Date64Type>::new(nullable);
182-
v.push(Box::new(b) as _)
169+
instantiate_primitive!(v, nullable, Float64Type)
183170
}
171+
&DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type),
172+
&DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type),
184173
&DataType::Utf8 => {
185174
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
186175
v.push(Box::new(b) as _)

datafusion/physical-plan/src/aggregates/group_values/group_column.rs

Lines changed: 90 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ use arrow::datatypes::GenericBinaryType;
3232
use arrow::datatypes::GenericStringType;
3333
use datafusion_common::utils::proxy::VecAllocExt;
3434

35+
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
36+
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
3537
use std::sync::Arc;
3638
use std::vec;
3739

38-
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
39-
4040
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
4141
///
4242
/// Implementations of this trait store an in-progress collection of group values
@@ -47,6 +47,8 @@ use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAP
4747
pub trait GroupColumn: Send + Sync {
4848
/// Returns equal if the row stored in this builder at `lhs_row` is equal to
4949
/// the row in `array` at `rhs_row`
50+
///
51+
/// Note that this comparison returns true if both elements are NULL
5052
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool;
5153
/// Appends the row at `row` in `array` to this builder
5254
fn append_val(&mut self, array: &ArrayRef, row: usize);
@@ -61,61 +63,96 @@ pub trait GroupColumn: Send + Sync {
6163
fn take_n(&mut self, n: usize) -> ArrayRef;
6264
}
6365

64-
/// An implementation of [`GroupColumn`] for primitive types.
65-
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
66+
/// An implementation of [`GroupColumn`] for primitive values which are known to have no nulls
67+
#[derive(Debug)]
68+
pub struct NonNullPrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
6669
group_values: Vec<T::Native>,
67-
nulls: Vec<bool>,
68-
/// whether the array contains at least one null, for fast non-null path
69-
has_null: bool,
70-
/// Can the input array contain nulls?
71-
nullable: bool,
7270
}
7371

74-
impl<T> PrimitiveGroupValueBuilder<T>
72+
impl<T> NonNullPrimitiveGroupValueBuilder<T>
7573
where
7674
T: ArrowPrimitiveType,
7775
{
78-
pub fn new(nullable: bool) -> Self {
76+
pub fn new() -> Self {
7977
Self {
8078
group_values: vec![],
81-
nulls: vec![],
82-
has_null: false,
83-
nullable,
8479
}
8580
}
8681
}
8782

88-
impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
83+
impl<T: ArrowPrimitiveType> GroupColumn for NonNullPrimitiveGroupValueBuilder<T> {
8984
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
90-
// non-null fast path
91-
// both non-null
92-
if !self.nullable {
93-
return self.group_values[lhs_row]
94-
== array.as_primitive::<T>().value(rhs_row);
95-
}
85+
// know input has no nulls
86+
self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
87+
}
9688

97-
// lhs is non-null
98-
if self.nulls[lhs_row] {
99-
if array.is_null(rhs_row) {
100-
return false;
101-
}
89+
fn append_val(&mut self, array: &ArrayRef, row: usize) {
90+
// input can't possibly have nulls, so don't worry about them
91+
self.group_values.push(array.as_primitive::<T>().value(row))
92+
}
93+
94+
fn len(&self) -> usize {
95+
self.group_values.len()
96+
}
97+
98+
fn size(&self) -> usize {
99+
self.group_values.allocated_size()
100+
}
101+
102+
fn build(self: Box<Self>) -> ArrayRef {
103+
let Self { group_values } = *self;
102104

103-
return self.group_values[lhs_row]
104-
== array.as_primitive::<T>().value(rhs_row);
105+
let nulls = None;
106+
107+
Arc::new(PrimitiveArray::<T>::new(
108+
ScalarBuffer::from(group_values),
109+
nulls,
110+
))
111+
}
112+
113+
fn take_n(&mut self, n: usize) -> ArrayRef {
114+
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
115+
let first_n_nulls = None;
116+
117+
Arc::new(PrimitiveArray::<T>::new(
118+
ScalarBuffer::from(first_n),
119+
first_n_nulls,
120+
))
121+
}
122+
}
123+
124+
/// An implementation of [`GroupColumn`] for primitive values which may have nulls
125+
#[derive(Debug)]
126+
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType> {
127+
group_values: Vec<T::Native>,
128+
nulls: MaybeNullBufferBuilder,
129+
}
130+
131+
impl<T> PrimitiveGroupValueBuilder<T>
132+
where
133+
T: ArrowPrimitiveType,
134+
{
135+
pub fn new() -> Self {
136+
Self {
137+
group_values: vec![],
138+
nulls: MaybeNullBufferBuilder::new(),
105139
}
140+
}
141+
}
106142

107-
array.is_null(rhs_row)
143+
impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
144+
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
145+
self.nulls.is_null(lhs_row) == array.is_null(rhs_row)
146+
&& self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
108147
}
109148

110149
fn append_val(&mut self, array: &ArrayRef, row: usize) {
111-
if self.nullable && array.is_null(row) {
150+
if array.is_null(row) {
151+
self.nulls.append(true);
112152
self.group_values.push(T::default_value());
113-
self.nulls.push(false);
114-
self.has_null = true;
115153
} else {
116-
let elem = array.as_primitive::<T>().value(row);
117-
self.group_values.push(elem);
118-
self.nulls.push(true);
154+
self.nulls.append(false);
155+
self.group_values.push(array.as_primitive::<T>().value(row));
119156
}
120157
}
121158

@@ -128,32 +165,27 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
128165
}
129166

130167
fn build(self: Box<Self>) -> ArrayRef {
131-
if self.has_null {
132-
Arc::new(PrimitiveArray::<T>::new(
133-
ScalarBuffer::from(self.group_values),
134-
Some(NullBuffer::from(self.nulls)),
135-
))
136-
} else {
137-
Arc::new(PrimitiveArray::<T>::new(
138-
ScalarBuffer::from(self.group_values),
139-
None,
140-
))
141-
}
168+
let Self {
169+
group_values,
170+
nulls,
171+
} = *self;
172+
173+
let nulls = nulls.build();
174+
175+
Arc::new(PrimitiveArray::<T>::new(
176+
ScalarBuffer::from(group_values),
177+
nulls,
178+
))
142179
}
143180

144181
fn take_n(&mut self, n: usize) -> ArrayRef {
145-
if self.has_null {
146-
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
147-
let first_n_nulls = self.nulls.drain(0..n).collect::<Vec<_>>();
148-
Arc::new(PrimitiveArray::<T>::new(
149-
ScalarBuffer::from(first_n),
150-
Some(NullBuffer::from(first_n_nulls)),
151-
))
152-
} else {
153-
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
154-
self.nulls.truncate(self.nulls.len() - n);
155-
Arc::new(PrimitiveArray::<T>::new(ScalarBuffer::from(first_n), None))
156-
}
182+
let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
183+
let first_n_nulls = self.nulls.take_n(n);
184+
185+
Arc::new(PrimitiveArray::<T>::new(
186+
ScalarBuffer::from(first_n),
187+
first_n_nulls,
188+
))
157189
}
158190
}
159191

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use bytes::GroupValuesByes;
3838
use datafusion_physical_expr::binary_map::OutputType;
3939

4040
mod group_column;
41+
mod null_builder;
4142

4243
/// Stores the group values during hash aggregation.
4344
///

0 commit comments

Comments
 (0)