Skip to content

Commit 363c3ee

Browse files
committed
Plubing for GroupsAccumulator
1 parent dd67a8a commit 363c3ee

File tree

4 files changed

+118
-78
lines changed

4 files changed

+118
-78
lines changed

datafusion/core/src/physical_plan/aggregates/row_hash2.rs

Lines changed: 5 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//!
2020
//! POC demonstration of GroupByHashApproach
2121
22+
use datafusion_physical_expr::GroupsAccumulator;
2223
use log::info;
2324
use std::sync::Arc;
2425
use std::task::{Context, Poll};
@@ -208,9 +209,10 @@ fn create_accumulators(
208209
aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
209210
) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
210211
info!("Creating accumulator for {aggregate_exprs:#?}");
211-
// This code needs to instantiate the GroupsAccumulator correctly
212-
213-
todo!()
212+
aggregate_exprs
213+
.into_iter()
214+
.map(|agg_expr| agg_expr.create_groups_accumulator())
215+
.collect()
214216
}
215217

216218
impl Stream for GroupedHashAggregateStream2 {
@@ -408,81 +410,6 @@ impl GroupedHashAggregateStream2 {
408410
}
409411
}
410412

411-
/// An implementation of GroupAccumulator is for a single aggregate
412-
/// (e.g. AVG) and stores the state for *all* groups internally
413-
///
414-
/// The logical model is that each group is given a `group_index`
415-
/// assigned and maintained by the hash table.
416-
///
417-
/// group_indexes are contiguous (there aren't gaps), and thus it is
418-
/// expected that each GroupAccumulator will use something like `Vec<..>`
419-
/// to store the group states.
420-
pub trait GroupsAccumulator: Send {
421-
/// updates the accumulator's state from a vector of arrays:
422-
///
423-
/// * `values`: the input arguments to the accumulator
424-
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
425-
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
426-
fn update_batch(
427-
&mut self,
428-
values: &[ArrayRef],
429-
group_indicies: &[usize],
430-
opt_filter: Option<&BooleanArray>,
431-
) -> Result<usize>;
432-
433-
/// Returns the final aggregate value for each group as a single
434-
/// `RecordBatch`
435-
///
436-
/// OPEN QUESTION: Should this method take a "batch_size: usize"
437-
/// and produce a Vec<RecordBatch> as output to avoid 1) requiring
438-
/// one giant intermediate buffer?
439-
///
440-
/// For example, the `SUM` accumulator maintains a running sum,
441-
/// and `evaluate` will produce that running sum as its output for
442-
/// all groups, in group_index order
443-
///
444-
/// This call should be treated as consuming (takes `self`, but it
445-
/// can not be due to keeping it object save) the accumulator is
446-
/// free to release / reset it is internal state after this call
447-
/// and error on any subsequent call.
448-
fn evaluate(&mut self) -> Result<ArrayRef>;
449-
450-
/// Returns any intermediate aggregate state used for multi-phase grouping
451-
///
452-
/// For example, AVG returns two arrays: `SUM` and `COUNT`.
453-
///
454-
/// This call should be treated as consuming (takes `self`, but it
455-
/// can not be due to keeping it object save) the accumulator is
456-
/// free to release / reset it is internal state after this call
457-
/// and error on any subsequent call.
458-
///
459-
/// TODO: consider returning a single Array (which could be a
460-
/// StructArray) instead
461-
fn state(&mut self) -> Result<Vec<ArrayRef>>;
462-
463-
/// merges intermediate state (from `state()`) into this accumulators values
464-
///
465-
/// For some aggregates (such as `SUM`), merge_batch is the same
466-
/// as `update_batch`, but for some aggregrates (such as `COUNT`)
467-
/// the operations differ. See [`Self::state`] for more details on how
468-
/// state is used and merged.
469-
///
470-
/// * `values`: arrays produced from calling `state` previously to the accumulator
471-
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
472-
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
473-
fn merge_batch(
474-
&mut self,
475-
values: &[ArrayRef],
476-
group_indicies: &[usize],
477-
opt_filter: Option<&BooleanArray>,
478-
) -> Result<()>;
479-
480-
/// Amount of memory used to store the state of this
481-
/// accumulator. This function is called once per batch, so it
482-
/// should be O(n) to compute
483-
fn size(&self) -> usize;
484-
}
485-
486413
impl GroupedHashAggregateStream2 {
487414
/// Create an output RecordBatch with all group keys and accumulator states/values
488415
fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Vectorized [`GroupsAccumulator`]
19+
20+
use arrow_array::{ArrayRef, BooleanArray};
21+
use datafusion_common::Result;
22+
23+
/// An implementation of GroupAccumulator is for a single aggregate
24+
/// (e.g. AVG) and stores the state for *all* groups internally
25+
///
26+
/// The logical model is that each group is given a `group_index`
27+
/// assigned and maintained by the hash table.
28+
///
29+
/// group_indexes are contiguous (there aren't gaps), and thus it is
30+
/// expected that each GroupAccumulator will use something like `Vec<..>`
31+
/// to store the group states.
32+
pub trait GroupsAccumulator: Send {
33+
/// updates the accumulator's state from a vector of arrays:
34+
///
35+
/// * `values`: the input arguments to the accumulator
36+
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
37+
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
38+
fn update_batch(
39+
&mut self,
40+
values: &[ArrayRef],
41+
group_indicies: &[usize],
42+
opt_filter: Option<&BooleanArray>,
43+
) -> Result<usize>;
44+
45+
/// Returns the final aggregate value for each group as a single
46+
/// `RecordBatch`
47+
///
48+
/// OPEN QUESTION: Should this method take a "batch_size: usize"
49+
/// and produce a Vec<RecordBatch> as output to avoid 1) requiring
50+
/// one giant intermediate buffer?
51+
///
52+
/// For example, the `SUM` accumulator maintains a running sum,
53+
/// and `evaluate` will produce that running sum as its output for
54+
/// all groups, in group_index order
55+
///
56+
/// This call should be treated as consuming (takes `self`, but it
57+
/// can not be due to keeping it object save) the accumulator is
58+
/// free to release / reset it is internal state after this call
59+
/// and error on any subsequent call.
60+
fn evaluate(&mut self) -> Result<ArrayRef>;
61+
62+
/// Returns any intermediate aggregate state used for multi-phase grouping
63+
///
64+
/// For example, AVG returns two arrays: `SUM` and `COUNT`.
65+
///
66+
/// This call should be treated as consuming (takes `self`, but it
67+
/// can not be due to keeping it object save) the accumulator is
68+
/// free to release / reset it is internal state after this call
69+
/// and error on any subsequent call.
70+
///
71+
/// TODO: consider returning a single Array (which could be a
72+
/// StructArray) instead
73+
fn state(&mut self) -> Result<Vec<ArrayRef>>;
74+
75+
/// merges intermediate state (from `state()`) into this accumulators values
76+
///
77+
/// For some aggregates (such as `SUM`), merge_batch is the same
78+
/// as `update_batch`, but for some aggregrates (such as `COUNT`)
79+
/// the operations differ. See [`Self::state`] for more details on how
80+
/// state is used and merged.
81+
///
82+
/// * `values`: arrays produced from calling `state` previously to the accumulator
83+
/// * `group_indices`: To which groups do the rows in `values` belong, group id)
84+
/// * `opt_filter`: if present, only update aggregate state using values[i] if opt_filter[i] is true
85+
fn merge_batch(
86+
&mut self,
87+
values: &[ArrayRef],
88+
group_indicies: &[usize],
89+
opt_filter: Option<&BooleanArray>,
90+
) -> Result<()>;
91+
92+
/// Amount of memory used to store the state of this
93+
/// accumulator. This function is called once per batch, so it
94+
/// should be O(n) to compute
95+
fn size(&self) -> usize;
96+
}

datafusion/physical-expr/src/aggregate/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use std::any::Any;
2525
use std::fmt::Debug;
2626
use std::sync::Arc;
2727

28+
use self::groups_accumulator::GroupsAccumulator;
29+
2830
pub(crate) mod approx_distinct;
2931
pub(crate) mod approx_median;
3032
pub(crate) mod approx_percentile_cont;
@@ -45,6 +47,7 @@ pub(crate) mod median;
4547
#[macro_use]
4648
pub(crate) mod min_max;
4749
pub mod build_in;
50+
pub(crate) mod groups_accumulator;
4851
mod hyperloglog;
4952
pub mod moving_min_max;
5053
pub mod row_accumulator;
@@ -118,6 +121,18 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
118121
)))
119122
}
120123

124+
/// Return a specialized [`GroupsAccumulator`] that manages state for all groups
125+
///
126+
/// For maximum performance, [`GroupsAccumulator`] should be
127+
/// implemented rather than [`Accumulator`].
128+
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
129+
// TODO: The default should implement a wrapper over
130+
// sef.create_accumulator
131+
Err(DataFusionError::NotImplemented(format!(
132+
"GroupsAccumulator hasn't been implemented for {self:?} yet"
133+
)))
134+
}
135+
121136
/// Construct an expression that calculates the aggregate in reverse.
122137
/// Typically the "reverse" expression is itself (e.g. SUM, COUNT).
123138
/// For aggregates that do not support calculation in reverse,

datafusion/physical-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ pub mod var_provider;
4545
pub mod window;
4646

4747
// reexport this to maintain compatibility with anything that used from_slice previously
48+
pub use aggregate::groups_accumulator::GroupsAccumulator;
4849
pub use aggregate::AggregateExpr;
50+
4951
pub use equivalence::{
5052
project_equivalence_properties, project_ordering_equivalence_properties,
5153
EquivalenceProperties, EquivalentClass, OrderingEquivalenceProperties,

0 commit comments

Comments
 (0)