Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug when aggregation stops before a group is finished #3177

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.AsyncUtil;
import com.apple.foundationdb.record.ByteArrayContinuation;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorResult;
Expand Down Expand Up @@ -57,77 +58,104 @@ public class AggregateCursor<M extends Message> implements RecordCursor<QueryRes
// Previous non-empty record processed by this cursor
@Nullable
private RecordCursorResult<QueryResult> previousValidResult;
// last row in last group, is null if the current group is the first group
@Nullable
private RecordCursorResult<QueryResult> lastInLastGroup;
@Nullable
byte[] continuation;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
byte[] continuation;
@Nullable
private byte[] continuation;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this still hasn't marked the continuation field as private. Is there a reason it needs package visibility?


public AggregateCursor(@Nonnull RecordCursor<QueryResult> inner,
@Nonnull final StreamGrouping<M> streamGrouping,
boolean isCreateDefaultOnEmpty) {
boolean isCreateDefaultOnEmpty,
@Nullable byte[] continuation) {
this.inner = inner;
this.streamGrouping = streamGrouping;
this.isCreateDefaultOnEmpty = isCreateDefaultOnEmpty;
this.continuation = continuation;
}

@Nonnull
@Override
public CompletableFuture<RecordCursorResult<QueryResult>> onNext() {
if (previousResult != null && !previousResult.hasNext()) {
// we are done
return CompletableFuture.completedFuture(RecordCursorResult.exhausted());
return CompletableFuture.completedFuture(RecordCursorResult.withoutNextValue(previousResult.getContinuation(),
previousResult.getNoNextReason()));
}

return AsyncUtil.whileTrue(() -> inner.onNext().thenApply(innerResult -> {
previousResult = innerResult;
if (!innerResult.hasNext()) {
if (!isNoRecords() || (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty())) {
// the method streamGrouping.finalizeGroup() computes previousCompleteResult and resets the accumulator
streamGrouping.finalizeGroup();
}
return false;
} else {
final QueryResult queryResult = Objects.requireNonNull(innerResult.get());
boolean groupBreak = streamGrouping.apply(queryResult);
if (!groupBreak) {
if (groupBreak) {
lastInLastGroup = previousValidResult;
} else {
// previousValidResult is the last row before group break, it sets the continuation
previousValidResult = innerResult;
}
return (!groupBreak);
}
}), getExecutor()).thenApply(vignore -> {
if (isNoRecords()) {
// Edge case where there are no records at all
if (isCreateDefaultOnEmpty && streamGrouping.isResultOnEmpty()) {
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), RecordCursorStartContinuation.START);
// either innerResult.hasNext() = false; or groupBreak = true
if (Verify.verifyNotNull(previousResult).hasNext()) {
// in this case groupBreak = true, return aggregated result and continuation
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
/*
* Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups.
* Below is an example that shows how continuation(previousValidResult) moves:
* Initial: previousResult = null, previousValidResult = null
row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0
row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1
row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2
* returns result (groupKey0, continuation = row1), and set previousValidResult = row2
*
* Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops
* In scenario 1, the iteration continues, it gets to row3:
row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3
* returns result (groupKey1, continuation = row2), and set previousValidResult = row3
*
* In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null:
row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2
* (Note that because a new iteration starts, groupBreak = False for row2.)
row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3
* returns result (groupKey1, continuation = row2), and set previousValidResult = row3
*
* Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts.
*/
previousValidResult = previousResult;
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
} else {
if (Verify.verifyNotNull(previousResult).getNoNextReason() == NoNextReason.SOURCE_EXHAUSTED) {
if (previousValidResult == null) {
return RecordCursorResult.exhausted();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to check to insert the empty result if isCreateDefaultOnEmpty? It feels like that information is being lost here.

Perhaps it would be simpler if there were fewer moving parts here. If we wait until 4.2, we could drop support entirely for isCreateDefaultOnEmpty (see #3107) which may simplify this somewhat.

} else {
RecordCursorContinuation continuation = previousValidResult.getContinuation();
previousValidResult = previousResult;
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
}
} else {
return RecordCursorResult.exhausted();
RecordCursorContinuation currentContinuation;
// in the current scan, if current group is the first group, set the continuation to the start of the current scan
// otherwise set the continuation to the last row in the last group
if (lastInLastGroup == null) {
currentContinuation = continuation == null ? RecordCursorStartContinuation.START : ByteArrayContinuation.fromNullable(continuation);
} else {
currentContinuation = lastInLastGroup.getContinuation();
}
previousValidResult = lastInLastGroup;
return RecordCursorResult.withoutNextValue(currentContinuation, Verify.verifyNotNull(previousResult).getNoNextReason());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also set previousResult? I think as written, if you call getNext a second time after getting one of these out-of-bounds values, you'll get an advanced continuation.

That is:

RecordCursor<T> aggCursor = getAggregateCursor();
RecordCursorResult<T> withValueResult = aggCursor.getNext(); // has a value
RecordCursorResult<T> withoutValueResult1 = aggCursor.getNext(); // first result with out-of-bound limit; continuation borrowed from withValueResult
RecordCursorResult<T> withoutValueResult2 = aggCursor.getNext(); // second result with out-of-bound limit; continuation may be greater than withoutValueResult1

In general, once a cursor returns a RecordCursorResult without a next value, it should continue to return the same result if getNext() is returned again.

}
}
// Use the last valid result for the continuation as we need non-terminal one here.
RecordCursorContinuation continuation = Verify.verifyNotNull(previousValidResult).getContinuation();
/*
* Update the previousValidResult to the next continuation even though it hasn't been returned. This is to return the correct continuation when there are single-element groups.
* Below is an example that shows how continuation(previousValidResult) moves:
* Initial: previousResult = null, previousValidResult = null
row0 groupKey0 groupBreak = False previousValidResult = row0 previousResult = row0
row1 groupKey0 groupBreak = False previousValidResult = row1 previousResult = row1
row2 groupKey1 groupBreak = True previousValidResult = row1 previousResult = row2
* returns result (groupKey0, continuation = row1), and set previousValidResult = row2
*
* Now there are 2 scenarios, 1) the current iteration continues; 2) the current iteration stops
* In scenario 1, the iteration continues, it gets to row3:
row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3
* returns result (groupKey1, continuation = row2), and set previousValidResult = row3
*
* In scenario 2, a new iteration starts from row2 (because the last returned continuation = row1), and set initial previousResult = null, previousValidResult = null:
row2 groupKey1 groupBreak = False previousValidResult = row2 previousResult = row2
* (Note that because a new iteration starts, groupBreak = False for row2.)
row3 groupKey2 groupBreak = True previousValidResult = row2 previousResult = row3
* returns result (groupKey1, continuation = row2), and set previousValidResult = row3
*
* Both scenarios returns the correct result, and continuation are both set to row3 in the end, row2 is scanned twice if a new iteration starts.
*/
previousValidResult = previousResult;
return RecordCursorResult.withNextValue(QueryResult.ofComputed(streamGrouping.getCompletedGroupResult()), continuation);
});
}

private boolean isNoRecords() {
return ((previousValidResult == null) && (!Verify.verifyNotNull(previousResult).hasNext()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public <M extends Message> RecordCursor<QueryResult> executePlan(@Nonnull FDBRec
(FDBRecordStoreBase<Message>)store,
context,
inner.getAlias());
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty)
return new AggregateCursor<>(innerCursor, streamGrouping, isCreateDefaultOnEmpty, continuation)
.skipThenLimit(executeProperties.getSkip(),
executeProperties.getReturnedRowLimit());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@

package com.apple.foundationdb.record.provider.foundationdb.query;

import com.apple.foundationdb.record.ByteScanLimiterFactory;
import com.apple.foundationdb.record.EvaluationContext;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorContinuation;
import com.apple.foundationdb.record.RecordCursorEndContinuation;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.RecordCursorStartContinuation;
import com.apple.foundationdb.record.RecordMetaData;
import com.apple.foundationdb.record.RecordScanLimiterFactory;
import com.apple.foundationdb.record.TestRecords1Proto;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.query.plan.ScanComparisons;
Expand All @@ -49,11 +55,13 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -305,6 +313,34 @@ void aggregateNoRecordsNoGroupNoAggregate(final boolean useNestedResult, final i
}
}

@Test
void aggregateHitScanLimitReached() {
try (final var context = openContext()) {
openSimpleRecordStore(context, NO_HOOK);

final var plan =
new AggregationPlanBuilder(recordStore.getRecordMetaData(), "MySimpleRecord")
.withAggregateValue("num_value_2", value -> new NumericAggregationValue.Sum(NumericAggregationValue.PhysicalOperator.SUM_I, value))
.withGroupCriterion("str_value_indexed")
.build(false);

// In the testing data, there are 2 groups, each group has 3 rows.
// recordScanLimit = 5: scans 3 rows, and the 4th scan hits SCAN_LIMIT_REACHED
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why it only scans 4 times instead of 5, but the offset=1 seems consistent.

// although the first group contains exactly 3 rows, we don't know we've finished the first group before we get to the 4th row, so nothing is returned, continuation is back to START
RecordCursorContinuation continuation1 = executePlanWithRecordScanLimit(plan, 5, null, null);
Assertions.assertEquals(RecordCursorStartContinuation.START, continuation1);
// recordScanLimit = 6: scans 4 rows, and the 5th scan hits SCAN_LIMIT_REACHED, we know that we've finished the 1st group, aggregated result is returned
RecordCursorContinuation continuation2 = executePlanWithRecordScanLimit(plan, 6, continuation1.toBytes(), resultOf("0", 3));
// continue with recordScanLimit = 5, scans 3 rows and hits SCAN_LIMIT_REACHED
// again, we don't know that we've finished the 2nd group, nothing is returned, continuation is back to where the scan starts
RecordCursorContinuation continuation3 = executePlanWithRecordScanLimit(plan, 5, continuation2.toBytes(), null);
Assertions.assertArrayEquals(continuation2.toBytes(), continuation3.toBytes());
// finish the 2nd group, aggregated result is returned, exhausted the source
RecordCursorContinuation continuation4 = executePlanWithRecordScanLimit(plan, 6, continuation3.toBytes(), resultOf("1", 12));
Assertions.assertEquals(RecordCursorEndContinuation.END, continuation4);
}
}

private static Stream<Arguments> provideArguments() {
// (boolean, rowLimit)
// setting rowLimit = 0 is equivalent to no limit
Expand Down Expand Up @@ -334,23 +370,50 @@ private void populateDB(final int numRecords) throws Exception {
}

@Nonnull
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final byte[] continuation) {
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final int recordScanLimit, final byte[] continuation) {
final var types = plan.getDynamicTypes();
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build();
ExecuteState executeState;
if (recordScanLimit > 0) {
executeState = new ExecuteState(RecordScanLimiterFactory.enforce(recordScanLimit), ByteScanLimiterFactory.tracking());
} else {
executeState = ExecuteState.NO_LIMITS;
}
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE;
executeProperties = executeProperties.setReturnedRowLimit(rowLimit);
executeProperties = executeProperties.setReturnedRowLimit(rowLimit).setState(executeState);
try {
return plan.executePlan(recordStore, EvaluationContext.forTypeRepository(typeRepository), continuation, executeProperties);
} catch (final Throwable t) {
throw Assertions.<RuntimeException>fail(t);
}
}

private RecordCursorContinuation executePlanWithRecordScanLimit(final RecordQueryPlan plan, final int recordScanLimit, byte[] continuation, @Nullable List<?> expectedResult) {
List<QueryResult> queryResults = new LinkedList<>();
RecordCursor<QueryResult> currentCursor = executePlan(plan, 0, recordScanLimit, continuation);
RecordCursorResult<QueryResult> currentCursorResult;
RecordCursorContinuation cursorContinuation;
while (true) {
currentCursorResult = currentCursor.getNext();
cursorContinuation = currentCursorResult.getContinuation();
if (!currentCursorResult.hasNext()) {
break;
}
queryResults.add(currentCursorResult.get());
}
if (expectedResult == null) {
Assertions.assertTrue(queryResults.isEmpty());
} else {
assertResults(this::assertResultFlattened, queryResults, expectedResult);
}
return cursorContinuation;
}

private List<QueryResult> executePlanWithRowLimit(final RecordQueryPlan plan, final int rowLimit) {
byte[] continuation = null;
List<QueryResult> queryResults = new LinkedList<>();
while (true) {
RecordCursor<QueryResult> currentCursor = executePlan(plan, rowLimit, continuation);
RecordCursor<QueryResult> currentCursor = executePlan(plan, rowLimit, 0, continuation);
RecordCursorResult<QueryResult> currentCursorResult;
while (true) {
currentCursorResult = currentCursor.getNext();
Expand All @@ -369,7 +432,7 @@ private List<QueryResult> executePlanWithRowLimit(final RecordQueryPlan plan, fi

private void assertResults(@Nonnull final BiConsumer<QueryResult, List<?>> checkConsumer, @Nonnull final List<QueryResult> actual, @Nonnull final List<?>... expected) {
Assertions.assertEquals(expected.length, actual.size());
for (var i = 0 ; i < actual.size() ; i++) {
for (var i = 0; i < actual.size(); i++) {
checkConsumer.accept(actual.get(i), expected[i]);
}
}
Expand All @@ -385,7 +448,7 @@ private void assertResultFlattened(final QueryResult actual, final List<?> expec
final var resultFields = resultFieldsBuilder.build();

Assertions.assertEquals(resultFields.size(), expected.size());
for (var i = 0 ; i < resultFields.size() ; i++) {
for (var i = 0; i < resultFields.size(); i++) {
Assertions.assertEquals(expected.get(i), resultFields.get(i));
}
}
Expand Down Expand Up @@ -416,7 +479,7 @@ private void assertResultNested(final QueryResult actual, final List<?> expected
final var resultFields = resultFieldsBuilder.build();

Assertions.assertEquals(resultFields.size(), expected.size());
for (var i = 0 ; i < resultFields.size() ; i++) {
for (var i = 0; i < resultFields.size(); i++) {
Assertions.assertEquals(expected.get(i), resultFields.get(i));
}
}
Expand Down