Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollup aggregate #2916

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.query.plan.cascades.PredicateMultiMap.ExpandCompensationFunction;
import com.apple.foundationdb.record.query.plan.cascades.expressions.GroupByExpression;
import com.apple.foundationdb.record.query.plan.cascades.expressions.LogicalFilterExpression;
import com.apple.foundationdb.record.query.plan.cascades.expressions.RelationalExpression;
import com.apple.foundationdb.record.query.plan.cascades.predicates.QueryPredicate;
import com.apple.foundationdb.record.query.plan.cascades.rules.DataAccessRule;
import com.apple.foundationdb.record.query.plan.cascades.values.AggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.Value;
import com.apple.foundationdb.record.query.plan.cascades.values.translation.TranslationMap;
import com.google.common.base.Suppliers;
Expand All @@ -37,6 +39,7 @@
import com.google.common.collect.Sets;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
Expand Down Expand Up @@ -835,4 +838,51 @@ public RelationalExpression apply(@Nonnull final Memoizer memoizer, @Nonnull Rel
}
}
}

/**
* Compensation class for group by matches that require a rollup.
*/
class RollupCompensation implements Compensation {

@Nullable
private final Value groupingValue;

@Nonnull
private final AggregateValue aggregateValue;

@Nonnull
private final Compensation childCompensation;

@Nonnull
private final CorrelationIdentifier matchedQuantifier;

public RollupCompensation(@Nullable final Value groupingValue,
@Nonnull final AggregateValue aggregateValue,
@Nonnull final Compensation childCompensation,
@Nonnull final CorrelationIdentifier matchedQuantifier) {
this.groupingValue = groupingValue;
this.aggregateValue = aggregateValue;
this.childCompensation = childCompensation;
this.matchedQuantifier = matchedQuantifier;
}

@Nonnull
@Override
public RelationalExpression apply(@Nonnull final Memoizer memoizer, @Nonnull RelationalExpression relationalExpression) {
// apply the child as needed
if (childCompensation.isNeeded()) {
relationalExpression = childCompensation.apply(memoizer, relationalExpression);
}

final var newBaseQuantifier = Quantifier.forEach(memoizer.memoizeReference(Reference.of(relationalExpression)),
matchedQuantifier);

return new GroupByExpression(groupingValue, aggregateValue, GroupByExpression::nestedResults, newBaseQuantifier);
}

@Override
public boolean isNeededForFiltering() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,18 @@ public class MatchInfo {
@Nonnull
private final QueryPlanConstraint additionalPlanConstraint;

@Nonnull
private final boolean isRollupRequired;

private MatchInfo(@Nonnull final Map<CorrelationIdentifier, ComparisonRange> parameterBindingMap,
@Nonnull final IdentityBiMap<Quantifier, PartialMatch> quantifierToPartialMatchMap,
@Nonnull final PredicateMultiMap predicateMap,
@Nonnull final PredicateMultiMap accumulatedPredicateMap,
@Nonnull final List<MatchedOrderingPart> matchedOrderingParts,
@Nonnull final Optional<Value> remainingComputationValueOptional,
@Nonnull final MaxMatchMap maxMatchMap,
@Nonnull final QueryPlanConstraint additionalPlanConstraint) {
@Nonnull final QueryPlanConstraint additionalPlanConstraint,
final boolean isRollupRequired) {
this.parameterBindingMap = ImmutableMap.copyOf(parameterBindingMap);
this.quantifierToPartialMatchMap = quantifierToPartialMatchMap.toImmutable();
this.aliasToPartialMatchMapSupplier = Suppliers.memoize(() -> {
Expand All @@ -113,6 +117,7 @@ private MatchInfo(@Nonnull final Map<CorrelationIdentifier, ComparisonRange> par
return mapBuilder.build();
});
this.accumulatedPredicateMap = accumulatedPredicateMap;
this.isRollupRequired = isRollupRequired;
this.constraintsSupplier = Suppliers.memoize(this::computeConstraints);
this.predicateMap = predicateMap;
this.accumulatedPredicateMapSupplier = Suppliers.memoize(() -> {
Expand Down Expand Up @@ -192,6 +197,10 @@ public QueryPlanConstraint getAdditionalPlanConstraint() {
return additionalPlanConstraint;
}

public boolean isRollupRequired() {
return isRollupRequired;
}

@Nonnull
public MatchInfo withOrderingInfo(@Nonnull final List<MatchedOrderingPart> matchedOrderingParts) {
return new MatchInfo(parameterBindingMap,
Expand All @@ -201,7 +210,24 @@ public MatchInfo withOrderingInfo(@Nonnull final List<MatchedOrderingPart> match
matchedOrderingParts,
remainingComputationValueOptional,
maxMatchMap,
additionalPlanConstraint);
additionalPlanConstraint,
false);
}

@Nonnull
public MatchInfo withRequiredRollup(boolean requiredRollup) {
if (requiredRollup == this.isRollupRequired) {
return this;
}
return new MatchInfo(parameterBindingMap,
quantifierToPartialMatchMap,
predicateMap,
accumulatedPredicateMap,
matchedOrderingParts,
remainingComputationValueOptional,
maxMatchMap,
additionalPlanConstraint,
requiredRollup);
}

@Nonnull
Expand Down Expand Up @@ -272,6 +298,14 @@ public static Optional<MatchInfo> tryMerge(@Nonnull final IdentityBiMap<Quantifi
return Optional.empty();
}

final var requiresRollup = regularQuantifiers.stream()
.map(key -> Objects.requireNonNull(partialMatchMap.getUnwrapped(key))) // always guaranteed
.anyMatch(partialMatch -> partialMatch.getMatchInfo().isRollupRequired());

if (requiresRollup && partialMatchMap.size() > 1) {
return Optional.empty();
}

return mergedParameterBindingsOptional
.map(mergedParameterBindings -> new MatchInfo(mergedParameterBindings,
partialMatchMap,
Expand All @@ -280,7 +314,8 @@ public static Optional<MatchInfo> tryMerge(@Nonnull final IdentityBiMap<Quantifi
orderingParts,
remainingComputationValueOptional,
maxMatchMap,
additionalPlanConstraint));
additionalPlanConstraint,
requiresRollup));
}

public static Optional<Map<CorrelationIdentifier, ComparisonRange>> tryMergeParameterBindings(final Collection<Map<CorrelationIdentifier, ComparisonRange>> parameterBindingMaps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
import com.apple.foundationdb.record.query.plan.cascades.typing.Type;
import com.apple.foundationdb.record.query.plan.cascades.values.AggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.FieldValue;
import com.apple.foundationdb.record.query.plan.cascades.values.IndexableAggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.RecordConstructorValue;
import com.apple.foundationdb.record.query.plan.cascades.values.StreamableAggregateValue;
import com.apple.foundationdb.record.query.plan.cascades.values.Value;
import com.apple.foundationdb.record.query.plan.cascades.values.Values;
import com.apple.foundationdb.record.query.plan.cascades.values.translation.MaxMatchMap;
import com.apple.foundationdb.record.query.plan.cascades.values.translation.TranslationMap;
import com.google.common.base.Suppliers;
Expand All @@ -53,6 +56,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.commons.lang3.mutable.MutableBoolean;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -254,9 +258,43 @@ public Compensation compensate(@Nonnull final PartialMatch partialMatch,
return Compensation.impossibleCompensation();
}

if (matchInfo.isRollupRequired()) {
final var baseQuantifier = Iterables.getOnlyElement(getQuantifiers()).getAlias();
final var rolledUpAggregations = getAggregateValueAsRollup(matchInfo, baseQuantifier);
final var rolledUpGroupBy = getGroupingValueAsRollup(matchInfo, baseQuantifier);
return new Compensation.RollupCompensation(rolledUpGroupBy, rolledUpAggregations, childCompensation.orElse(Compensation.noCompensation()), baseQuantifier);
}

return Compensation.noCompensation();
}

@Nonnull
AggregateValue getAggregateValueAsRollup(@Nonnull MatchInfo matchInfo, @Nonnull CorrelationIdentifier baseQuantifier) {
final var maxMatchMap = matchInfo.getMaxMatchMap();
final var candidateResult = maxMatchMap.getCandidateResultValue();
final var aggregations = maxMatchMap.getQueryResultValue().preOrderStream()
.filter(v -> v instanceof StreamableAggregateValue || v instanceof IndexableAggregateValue).collect(ImmutableList.toImmutableList());
// replace each aggregate value child with the underlying aggregate reference, effectively creating a higher-order aggregate.
final var pulledUpAggregations = candidateResult.pullUp(aggregations, AliasMap.emptyMap(), ImmutableSet.of(), baseQuantifier);
final var rollUps = aggregations.stream().map(aggregation -> aggregation.withChildren(ImmutableList.of(pulledUpAggregations.get(aggregation))))
.collect(ImmutableList.toImmutableList());
return RecordConstructorValue.ofUnnamed(rollUps);
}

@Nullable
Value getGroupingValueAsRollup(@Nonnull MatchInfo matchInfo, @Nonnull CorrelationIdentifier baseQuantifier) {
if (groupingValue == null) {
return null;
}
final var maxMatchMap = matchInfo.getMaxMatchMap();
final var candidateResult = maxMatchMap.getCandidateResultValue();
// this pulls all group by expressions from the max match map.
final var groupByExpressions = ((RecordConstructorValue)((RecordConstructorValue)maxMatchMap.getQueryResultValue())
.getColumns().get(0).getValue()).getColumns().stream().map(Column::getValue).collect(ImmutableList.toImmutableList());
final var pulledUpGroupByExpressions = candidateResult.pullUp(groupByExpressions, AliasMap.emptyMap(), ImmutableSet.of(), baseQuantifier);
return RecordConstructorValue.ofUnnamed(groupByExpressions.stream().map(pulledUpGroupByExpressions::get).collect(ImmutableList.toImmutableList()));
}

@Nonnull
@Override
public Iterable<MatchInfo> subsumedBy(@Nonnull final RelationalExpression candidateExpression,
Expand Down Expand Up @@ -285,16 +323,52 @@ public Iterable<MatchInfo> subsumedBy(@Nonnull final RelationalExpression candid
ValueEquivalence.fromAliasMap(bindingAliasMap)
.then(ValueEquivalence.constantEquivalenceWithEvaluationContext(evaluationContext));

final MutableBoolean requiresRollup = new MutableBoolean(false);
final var subsumedBy = aggregateValue.subsumedBy(otherAggregateValue, valueEquivalence)
.compose(ignored -> {
if (groupingValue == null && otherGroupingValue == null) {
return BooleanWithConstraint.alwaysTrue();
}
if (groupingValue == null || otherGroupingValue == null) {
if (otherGroupingValue == null) {
return BooleanWithConstraint.falseValue();
}

return groupingValue.subsumedBy(otherGroupingValue, valueEquivalence);
if (groupingValue == null) {
requiresRollup.setTrue();
return BooleanWithConstraint.alwaysTrue();
}

final var sameGrouping = groupingValue.subsumedBy(otherGroupingValue, valueEquivalence);
if (sameGrouping.isTrue()) {
return sameGrouping;
}

final var groupingColumns = Values.deconstructRecord(groupingValue);
final var otherGroupingColumns = Values.deconstructRecord(otherGroupingValue);

// this is very restrictive and should employ set semantics instead.
if (groupingColumns.size() > otherGroupingColumns.size()) {
return BooleanWithConstraint.falseValue();
}

var composedEquivalence = BooleanWithConstraint.alwaysTrue();
boolean foundEquivalence = false;
for (final var groupingColumn : groupingColumns) {
for (final var otherGroupingColumn : otherGroupingColumns) {
final var equivalenceResult = groupingColumn.subsumedBy(otherGroupingColumn, valueEquivalence);
if (equivalenceResult.isTrue()) {
composedEquivalence = composedEquivalence.compose(ignored2 -> equivalenceResult);
foundEquivalence = true;
break;
}
}
if (!foundEquivalence) {
return BooleanWithConstraint.falseValue();
}
foundEquivalence = false;
}
requiresRollup.setTrue();
return composedEquivalence;
});

if (subsumedBy.isTrue()) {
Expand All @@ -307,7 +381,7 @@ public Iterable<MatchInfo> subsumedBy(@Nonnull final RelationalExpression candid
return MatchInfo.tryMerge(partialMatchMap, ImmutableMap.of(), PredicateMap.empty(),
PredicateMap.empty(), Optional.empty(),
maxMatchMap, queryPlanConstraint)
.map(ImmutableList::of)
.map(matchInfo -> ImmutableList.of(matchInfo.withRequiredRollup(requiresRollup.booleanValue())))
.orElse(ImmutableList.of());
}
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,11 @@ public Optional<MatchInfo> adjustMatch(@Nonnull final PartialMatch partialMatch)
return Optional.empty();
}

// underlying is a group by that requires a rollup, bailout
if (childMatchInfo.isRollupRequired()) {
return Optional.empty();
}

for (final var predicate : getPredicates()) {
if (predicate instanceof Placeholder) {
if (!((Placeholder)predicate).getRanges().isEmpty()) {
Expand Down