Skip to content

Commit 817c74e

Browse files
AggregationIterators and Generating Result Projection
1 parent b5549a4 commit 817c74e

File tree

4 files changed

+73
-9
lines changed

4 files changed

+73
-9
lines changed

docs/aggregations/AggregationIterator.md

+52-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# AggregationIterators
1+
# AggregationIterator
22

3-
`AggregationIterator` is an [abstraction](#contract) of [aggregation iterators](#implementations) of [UnsafeRow](../UnsafeRow.md)s.
3+
`AggregationIterator` is an [abstraction](#contract) of [aggregation iterators](#implementations) (of [UnsafeRow](../UnsafeRow.md)s) that are used by [aggregate physical operators](../physical-operators/BaseAggregateExec.md) to process rows in a partition.
44

55
```scala
66
abstract class AggregationIterator(...)
@@ -112,13 +112,61 @@ Aggregate Iterators | Operations
112112
`SortBasedAggregationIterator` | <ul><li>[next element](SortBasedAggregationIterator.md#next)<li>[outputForEmptyGroupingKeyWithoutInput](SortBasedAggregationIterator.md#outputForEmptyGroupingKeyWithoutInput)</ul>
113113
`TungstenAggregationIterator` | <ul><li>[next element](TungstenAggregationIterator.md#next)<li>[outputForEmptyGroupingKeyWithoutInput](TungstenAggregationIterator.md#outputForEmptyGroupingKeyWithoutInput)</ul>
114114

115-
### generateResultProjection { #generateResultProjection }
115+
### Generating Result Projection { #generateResultProjection }
116116

117117
```scala
118118
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
119119
```
120120

121-
`generateResultProjection`...FIXME
121+
??? note "TungstenAggregationIterator"
122+
[TungstenAggregationIterator](TungstenAggregationIterator.md) overrides [generateResultProjection](TungstenAggregationIterator.md#generateResultProjection) for partial aggregation (non-`Final` and non-`Complete` aggregate modes).
123+
124+
`generateResultProjection` branches off based on the [aggregate modes](../expressions/AggregateExpression.md#mode) of the [aggregates](#aggregateExpressions):
125+
126+
1. [Final and Complete](#generateResultProjection-final-complete)
127+
1. [Partial and PartialMerge](#generateResultProjection-partial-partialmerge)
128+
1. [No modes](#generateResultProjection-no-modes)
129+
130+
!!! note "Main Differences between Aggregate Modes"
131+
132+
Final and Complete | Partial and PartialMerge
133+
-------------------|-------------------------
134+
Focus on [DeclarativeAggregate](../expressions/DeclarativeAggregate.md)s to execute the [evaluateExpression](../expressions/DeclarativeAggregate.md#evaluateExpression)s (while the [allImperativeAggregateFunctions](#allImperativeAggregateFunctions) simply [eval](../expressions/Expression.md#eval)) | Focus on [TypedImperativeAggregate](../expressions/TypedImperativeAggregate.md)s so they can [serializeAggregateBufferInPlace](../expressions/TypedImperativeAggregate.md#serializeAggregateBufferInPlace)
135+
An [UnsafeProjection](../expressions/UnsafeProjection.md) binds the [resultExpressions](#resultExpressions) to the following:<ol><li> [groupingAttributes](#groupingAttributes)<li>the [aggregateAttributes](#aggregateAttributes)</ol> | An [UnsafeProjection](../expressions/UnsafeProjection.md) binds the [groupingAttributes](#groupingAttributes) and [bufferAttributes](#bufferAttributes) to the following (repeated twice rightly):<ol><li>the [groupingAttributes](#groupingAttributes)<li>the [bufferAttributes](#bufferAttributes)</ol>
136+
Uses an [UnsafeProjection](../expressions/UnsafeProjection.md) to generate an [UnsafeRow](../UnsafeRow.md) for the following:<ol><li>the current grouping key<li>the aggregate results</ol> | Uses an [UnsafeProjection](../expressions/UnsafeProjection.md) to generate an [UnsafeRow](../UnsafeRow.md) for the following:<ol><li>the current grouping key<li>the current buffer</ol>
137+
138+
#### Final and Complete { #generateResultProjection-final-complete }
139+
140+
For [Final](../expressions/AggregateExpression.md#Final) or [Complete](../expressions/AggregateExpression.md#Complete) modes, `generateResultProjection` does the following:
141+
142+
1. Collects [expressions to evaluate the final value](../expressions/DeclarativeAggregate.md#evaluateExpression)s of the [DeclarativeAggregate](../expressions/DeclarativeAggregate.md)s and `NoOp`s for the [AggregateFunction](../expressions/AggregateFunction.md)s among the [aggregateFunctions](#aggregateFunctions). `generateResultProjection` preserves the order of the evaluate expressions and `NoOp`s (so the `i`th aggregate function uses the `i`th evaluation expressions)
143+
1. Executes the [newMutableProjection](#newMutableProjection) with the evaluation expressions and the [aggBufferAttributes](../expressions/AggregateFunction.md#aggBufferAttributes) of the [aggregateFunctions](#aggregateFunctions) to create a [MutableProjection](../expressions/MutableProjection.md)
144+
1. Requests the `MutableProjection` to [store the aggregate results](../expressions/MutableProjection.md#target) (of all the [DeclarativeAggregate](../expressions/DeclarativeAggregate.md)s) in a `SpecificInternalRow`
145+
1. [Creates an UnsafeProjection](../expressions/UnsafeProjection.md#create) for the [resultExpressions](#resultExpressions) and the [groupingAttributes](#groupingAttributes) with the [aggregateAttributes](#aggregateAttributes) (for the input schema)
146+
1. Initializes the `UnsafeProjection` with the [partIndex](#partIndex)
147+
148+
In the end, `generateResultProjection` creates a result projection function that does the following:
149+
150+
1. Generates results for all expression-based aggregate functions (using the `MutableProjection` with the given `currentBuffer`)
151+
1. Generates results for all [imperative aggregate functions](#allImperativeAggregateFunctions)
152+
1. Uses the `UnsafeProjection` to generate an [UnsafeRow](../UnsafeRow.md) with the aggregate results for the current grouping key and the aggregate results
153+
154+
#### Partial and PartialMerge { #generateResultProjection-partial-partialmerge }
155+
156+
For [Partial](../expressions/AggregateExpression.md#Partial) or [PartialMerge](../expressions/AggregateExpression.md#PartialMerge) modes, `generateResultProjection` does the following:
157+
158+
1. [Creates an UnsafeProjection](../expressions/UnsafeProjection.md#create) for the [groupingAttributes](#groupingAttributes) with the [aggBufferAttributes](../expressions/AggregateFunction.md#aggBufferAttributes) of the [aggregateFunctions](#aggregateFunctions)
159+
1. Initializes the `UnsafeProjection` with the [partIndex](#partIndex)
160+
1. Collects the [TypedImperativeAggregate](../expressions/TypedImperativeAggregate.md)s from the [aggregateFunctions](#aggregateFunctions) (as they store a generic object in an aggregation buffer, and require calling serialization before shuffling)
161+
162+
In the end, `generateResultProjection` creates a result projection function that does the following:
163+
164+
1. Requests the [TypedImperativeAggregate](../expressions/TypedImperativeAggregate.md)s (from the [aggregateFunctions](#aggregateFunctions)) to [serializeAggregateBufferInPlace](../expressions/TypedImperativeAggregate.md#serializeAggregateBufferInPlace) with the given `currentBuffer`
165+
1. Uses the `UnsafeProjection` to generate an [UnsafeRow](../UnsafeRow.md) with the current grouping key and buffer
166+
167+
#### No Modes { #generateResultProjection-no-modes }
168+
169+
For no aggregate modes, `generateResultProjection`...FIXME
122170

123171
## Initializing Aggregation Buffer { #initializeBuffer }
124172

docs/aggregations/TungstenAggregationIterator.md

+12
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,18 @@ While [switchToSortBasedAggregation](#switchToSortBasedAggregation), `TungstenAg
334334

335335
Peak memory consumption can be monitored using [peakMemory](#peakMemory) performance metric.
336336

337+
## Generating Result Projection { #generateResultProjection }
338+
339+
??? note "AggregationIterator"
340+
341+
```scala
342+
generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow
343+
```
344+
345+
`generateResultProjection` is part of the [AggregationIterator](AggregationIterator.md#generateResultProjection) abstraction.
346+
347+
`generateResultProjection` uses an `UnsafeRowJoiner` for a fast(er) path (than projection) for partial aggregation (when the [aggregateExpressions](#aggregateExpressions) have [aggregation modes](../expressions/AggregateExpression.md#mode) that are neither [Final](../expressions/AggregateExpression.md#Final) nor [Complete](../expressions/AggregateExpression.md#Complete) aggregation mode).
348+
337349
## Demo
338350

339351
```text

docs/expressions/AggregateExpression.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ A filter is used in [Partial](#Partial) and [Complete](#Complete) modes only (cf
3434

3535
* `Partial` and `Complete` or `PartialMerge` and `Final` pairs are supported
3636

37-
### <span id="Complete"> Complete
37+
### Complete { #Complete }
3838

3939
No prefix (in [toString](#toString))
4040

@@ -44,16 +44,16 @@ Used when:
4444
* `TungstenAggregationIterator` is requested for the [switchToSortBasedAggregation](../aggregations/TungstenAggregationIterator.md#switchToSortBasedAggregation)
4545
* _others_
4646

47-
### <span id="Final"> Final
47+
### Final { #Final }
4848

4949
No prefix (in [toString](#toString))
5050

51-
### <span id="Partial"> Partial
51+
### Partial { #Partial }
5252

5353
* Partial aggregation
5454
* `partial_` prefix (in [toString](#toString))
5555

56-
### <span id="PartialMerge"> PartialMerge
56+
### PartialMerge { #PartialMerge }
5757

5858
* `merge_` prefix (in [toString](#toString))
5959

docs/expressions/DeclarativeAggregate.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
---
2+
title: DeclarativeAggregate
3+
---
4+
15
# DeclarativeAggregate Expression-Based Functions
26

3-
`DeclarativeAggregate` is an [extension](#contract) of the [AggregateFunction](AggregateFunction.md) abstraction for [Catalyst Expression-based aggregate functions](#implementations) that use [Catalyst Expression](Expression.md) for evaluation.
7+
`DeclarativeAggregate` is an [extension](#contract) of the [AggregateFunction](AggregateFunction.md) abstraction for [expression-based aggregate functions](#implementations) that use [Catalyst Expression](Expression.md) for evaluation.
48

59
`DeclarativeAggregate` is an [Unevaluable](Unevaluable.md).
610

0 commit comments

Comments
 (0)