forked from facebookincubator/velox
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add reduce-agg blog post (facebookincubator#6851)
Summary: Pull Request resolved: facebookincubator#6851 Reviewed By: xiaoxmeng, spershin Differential Revision: D49834730 Pulled By: mbasmanova fbshipit-source-id: 43c17ed4a193a5857fbdac34d3d41f71854d7f40
- Loading branch information
1 parent
97ff49f
commit 365fd7e
Showing
1 changed file
with
178 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
--- | ||
slug: reduce-agg | ||
title: reduce_agg lambda aggregate function | ||
authors: [mbasmanova] | ||
tags: [tech-blog,functions] | ||
--- | ||
|
||
## Definition | ||
|
||
<a href="https://facebookincubator.github.io/velox/functions/presto/aggregate.html#reduce_agg">Reduce_agg</a> | ||
is the only lambda aggregate Presto function. It allows users to define arbitrary aggregation | ||
logic using 2 lambda functions. | ||
|
||
``` | ||
reduce_agg(inputValue T, initialState S, inputFunction(S, T, S), combineFunction(S, S, S)) → S | ||
Reduces all non-NULL input values into a single value. inputFunction will be invoked for | ||
each non-NULL input value. If all inputs are NULL, the result is NULL. In addition to taking | ||
the input value, inputFunction takes the current state, initially initialState, and returns the | ||
new state. combineFunction will be invoked to combine two states into a new state. The final | ||
state is returned. Throws an error if initialState is NULL or inputFunction or combineFunction | ||
returns a NULL. | ||
``` | ||
|
||
Once can think of reduce_agg as using inputFunction to implement partial aggregation and | ||
combineFunction to implement final aggregation. Partial aggregation processes a list of | ||
input values and produces an intermediate state: | ||
|
||
``` | ||
auto s = initialState; | ||
for (auto x : input) { | ||
s = inputFunction(s, x); | ||
} | ||
return s; | ||
``` | ||
|
||
Final aggregation processes a list of intermediate states and computes the final state. | ||
|
||
``` | ||
auto s = intermediates[0]; | ||
for (auto i = 1; i < intermediates.size(); ++i) | ||
s = combineFunction(s, intermediates[i]); | ||
} | ||
return s; | ||
``` | ||
|
||
For example, one can implement SUM aggregation using reduce_agg as follows: | ||
|
||
``` | ||
reduce_agg(c, 0, (s, x) -> s + x, (s, s2) -> s + s2) | ||
``` | ||
|
||
Implementation of AVG aggregation is a bit trickier. For AVG, state is a tuple of sum and | ||
count. Hence, reduce_agg can be used to compute (sum, count) pair, but it cannot compute | ||
the actual average. One needs to apply a scalar function on top of reduce_agg to get the | ||
average. | ||
|
||
``` | ||
SELECT id, sum_and_count.sum / sum_and_count.count FROM ( | ||
SELECT id, reduce_agg(value, CAST(row(0, 0) AS row(sum double, count bigint)), | ||
(s, x) -> CAST(row(s.sum + x, s.count + 1) AS row(sum double, count bigint)), | ||
(s, s2) -> CAST(row(s.sum + s2.sum, s.count + s2.count) AS row(sum double, count bigint))) AS sum_and_count | ||
FROM t | ||
GROUP BY id | ||
); | ||
``` | ||
|
||
The examples of using reduce_agg to compute SUM and AVG are for illustrative purposes. | ||
One should not use reduce_agg if a specialized aggregation function is available. | ||
|
||
One use case for reduce_agg we see in production is to compute a product of input values. | ||
|
||
``` | ||
reduce_agg(c, 1.0, (s, x) -> s * x, (s, s2) -> s * s2) | ||
``` | ||
|
||
Another example is to compute a list of top N distinct values from all input arrays. | ||
|
||
``` | ||
reduce_agg(x, array[], | ||
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000), | ||
(a, b) -> slice(reverse(array_sort(array_distinct(concat(a, b)))), 1, 1000)) | ||
``` | ||
|
||
Note that this is equivalent to the following query: | ||
|
||
``` | ||
SELECT array_agg(v) FROM ( | ||
SELECT DISTINCT v | ||
FROM t, UNNEST(x) AS u(v) | ||
ORDER BY v DESC | ||
LIMIT 1000 | ||
) | ||
``` | ||
|
||
## Implementation | ||
|
||
Efficient implementation of reduce_agg lambda function is not straightforward. Let’s | ||
consider the logic for partial aggregation. | ||
|
||
``` | ||
auto s = initialState; | ||
for (auto x : input) { | ||
s = inputFunction(s, x); | ||
} | ||
``` | ||
|
||
This is a data-dependent loop, i.e. the next loop iteration depends on the results of | ||
the previous iteration. inputFunction needs to be invoked on each input value `x` | ||
separately. Since inputFunction is a user-defined lambda, invoking inputFunction means | ||
evaluating an expression. And since expression evaluation in Velox is optimized for | ||
processing large batches of values at a time, evaluating expressions on one value at | ||
a time is very inefficient. To optimize the implementation of reduce_agg we need to | ||
reduce the number of times we evaluate user-defined lambdas and increase the number | ||
of values we process each time. | ||
|
||
One approach is to | ||
|
||
1. convert all input values into states by evaluating inputFunction(initialState, x); | ||
1. split states into pairs and evaluate combineFunction on all pairs; | ||
1. repeat step (2) until we have only one state left. | ||
|
||
Let’s say we have 1024 values to process. Step 1 evaluates inputFunction expression | ||
on 1024 values at once. Step 2 evaluates combineFunction on 512 pairs, then on 256 | ||
pairs, then on 128 pairs, 64, 32, 16, 8, 4, 2, finally producing a single state. | ||
Step 2 evaluates combineFunction 9 times. In total, this implementation evaluates | ||
user-defined expressions 10 times on multiple values each time. This is a lot more | ||
efficient than the original implementation that evaluates user-defined expressions | ||
1024 times. | ||
|
||
In general, given N inputs, the original implementation evaluates expressions N times | ||
while the new one log2(N) times. | ||
|
||
Note that in case when N is not a power of two, splitting states into pairs may leave | ||
an extra state. For example, splitting 11 states produces 5 pairs + one extra state. | ||
In this case, we set aside the extra state, evaluate combineFunction on 5 pairs, then | ||
bring extra state back to a total of 6 states and continue. | ||
|
||
A benchmark, velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp, shows that | ||
initial implementation of reduce_agg is 60x slower than SUM, while the optimized | ||
implementation is only 3x slower. A specialized aggregation function will always be | ||
more efficient than generic reduce_agg, hence, reduce_agg should be used only when | ||
specialized aggregation function is not available. | ||
|
||
Finally, a side effect of the optimized implementation is that it doesn't support | ||
applying reduce_agg to sorted inputs. I.e. one cannot use reduce_agg to compute an | ||
equivalent of | ||
|
||
``` | ||
SELECT a, array_agg(b ORDER BY b) FROM t GROUP BY 1 | ||
``` | ||
|
||
The array_agg computation depends on order of inputs. A comparable implementation | ||
using reduce_agg would look like this: | ||
|
||
``` | ||
SELECT a, | ||
reduce_agg(b, array[], | ||
(s, x) -> concat(s, array[x]), | ||
(s, s2) -> concat(s, s2) | ||
ORDER BY b) | ||
FROM t GROUP BY 1 | ||
``` | ||
|
||
To respect ORDER BY b, the reduce_agg would have to apply inputFunction to each | ||
input value one at a time using a data-dependent loop from above. As we saw, this | ||
is very expensive. The optimization we apply does not preserve the order of inputs, | ||
hence, cannot support the query above. Note that | ||
<a href="https://github.com/facebookincubator/velox/issues/6434">Presto</a> doesn't | ||
support applying reduce_agg to sorted inputs either. | ||
|
||
|
||
Thank you <a href="https://www.linkedin.com/in/orrierling">Orri Erling</a> for brainstorming | ||
and <a href="https://www.linkedin.com/in/xiaoxuanmeng">Xiaoxuan Meng</a> and | ||
<a href="https://www.linkedin.com/in/pedro-pedreira/">Pedro Eugênio Rocha Pedreira</a> for | ||
reviewing the code. |