Skip to content

Commit aa6176f

Browse files
authored
Fix continuation deserialization errors by deserializing streaming aggregate plans in the correct order (#3215)
The error we were seeing was that for certain plans, we were getting errors trying to deserialize the elements of a `FDBStreamingAggregatePlan` continuation. In particular, we were seeing the type's reference before the type's definition. This was because the serialization and deserialization of the plan happened in different orders, which meant that at deserialization time, we were no longer guaranteed to come across the definition first. I've kept the order of serialization the same, and then updated the deserialization code path to align it with existing continuations that are out in the wild. This fixes #3214. I've enabled FORCE_CONTINUATIONS mode on that test, though it mostly just works on the embedded vs. current configuration (which is enough to surface the bug). More test coverage is added by having the `StreamingAggregate` plans in one of the Record Layer tests all go through `verifySerialization`, which previously wasn't being done on those plans.
1 parent c1898b5 commit aa6176f

File tree

4 files changed

+49
-13
lines changed

4 files changed

+49
-13
lines changed

Diff for: fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/query/plan/plans/RecordQueryStreamingAggregationPlan.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.apple.foundationdb.record.provider.common.StoreTimer;
3636
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase;
3737
import com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer;
38-
import com.apple.foundationdb.record.query.plan.explain.ExplainPlanVisitor;
3938
import com.apple.foundationdb.record.query.plan.cascades.AliasMap;
4039
import com.apple.foundationdb.record.query.plan.cascades.CorrelationIdentifier;
4140
import com.apple.foundationdb.record.query.plan.cascades.Quantifier;
@@ -50,6 +49,7 @@
5049
import com.apple.foundationdb.record.query.plan.cascades.values.ObjectValue;
5150
import com.apple.foundationdb.record.query.plan.cascades.values.Value;
5251
import com.apple.foundationdb.record.query.plan.cascades.values.translation.TranslationMap;
52+
import com.apple.foundationdb.record.query.plan.explain.ExplainPlanVisitor;
5353
import com.apple.foundationdb.record.query.plan.serialization.PlanSerialization;
5454
import com.google.auto.service.AutoService;
5555
import com.google.common.collect.ImmutableList;
@@ -385,15 +385,18 @@ public PRecordQueryPlan toRecordQueryPlanProto(@Nonnull final PlanSerializationC
385385
@Nonnull
386386
public static RecordQueryStreamingAggregationPlan fromProto(@Nonnull final PlanSerializationContext serializationContext,
387387
@Nonnull final PRecordQueryStreamingAggregationPlan recordQueryStreamingAggregationPlanProto) {
388-
return new RecordQueryStreamingAggregationPlan(Quantifier.Physical.fromProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getInner())),
389-
PlanSerialization.getFieldOrNull(recordQueryStreamingAggregationPlanProto,
390-
PRecordQueryStreamingAggregationPlan::hasGroupingKeyValue,
391-
m -> Value.fromValueProto(serializationContext, m.getGroupingKeyValue())),
392-
(AggregateValue)Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateValue())),
393-
CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getGroupingKeyAlias())),
394-
CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias())),
395-
Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue())),
396-
recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true);
388+
// Note: it is important for proper deserialization (at least of things that interact with the serializationContext's cache of
389+
// referenced values and plans) that we deserialize the values in the same order as they are serialized, or we may
390+
// not
391+
final Quantifier.Physical inner = Quantifier.Physical.fromProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getInner()));
392+
final AggregateValue aggregateValue = (AggregateValue) Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateValue()));
393+
@Nullable final Value groupingKeyValue = PlanSerialization.getFieldOrNull(recordQueryStreamingAggregationPlanProto, PRecordQueryStreamingAggregationPlan::hasGroupingKeyValue,
394+
m -> Value.fromValueProto(serializationContext, m.getGroupingKeyValue()));
395+
final CorrelationIdentifier groupingKeyAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getGroupingKeyAlias()));
396+
final CorrelationIdentifier aggregateAlias = CorrelationIdentifier.of(Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getAggregateAlias()));
397+
final Value completeResultValue = Value.fromValueProto(serializationContext, Objects.requireNonNull(recordQueryStreamingAggregationPlanProto.getCompleteResultValue()));
398+
final boolean isCreateDefaultOnEmpty = recordQueryStreamingAggregationPlanProto.hasIsCreateDefaultOnEmpty() ? recordQueryStreamingAggregationPlanProto.getIsCreateDefaultOnEmpty() : true;
399+
return new RecordQueryStreamingAggregationPlan(inner, groupingKeyValue, aggregateValue, groupingKeyAlias, aggregateAlias, completeResultValue, isCreateDefaultOnEmpty);
397400
}
398401

399402
@Nonnull

Diff for: fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/query/FDBStreamAggregationTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,8 @@ private void populateDB(final int numRecords) throws Exception {
334334
}
335335

336336
@Nonnull
337-
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan plan, final int rowLimit, final byte[] continuation) {
337+
private RecordCursor<QueryResult> executePlan(final RecordQueryPlan originalPlan, final int rowLimit, final byte[] continuation) {
338+
final RecordQueryPlan plan = verifySerialization(originalPlan);
338339
final var types = plan.getDynamicTypes();
339340
final var typeRepository = TypeRepository.newBuilder().addAllTypes(types).build();
340341
ExecuteProperties executeProperties = ExecuteProperties.SERIAL_EXECUTE;

Diff for: yaml-tests/src/test/java/YamlIntegrationTests.java

-2
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,6 @@ public void aggregateEmptyTable(YamlTest.Runner runner) throws Exception {
141141
}
142142

143143
@TestTemplate
144-
@ExcludeYamlTestConfig(value = YamlTestConfigFilters.DO_NOT_FORCE_CONTINUATIONS,
145-
reason = "Infinite continuation loop (https://github.com/FoundationDB/fdb-record-layer/issues/3095)")
146144
public void aggregateIndexTestsCount(YamlTest.Runner runner) throws Exception {
147145
runner.runYamsql("aggregate-index-tests-count.yamsql");
148146
}

Diff for: yaml-tests/src/test/resources/aggregate-index-tests-count.yamsql

+34
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ test_block:
4747
-
4848
- query: select count(*) from t1
4949
- explain: "AISCAN(MV1 <,> BY_GROUP -> [_0: VALUE:[0]]) | MAP (_ AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)"
50+
# Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206
51+
- maxRows: 0
5052
- result: [{4}]
5153
-
5254
- query: select count(*) from t1 group by col2
@@ -55,6 +57,8 @@ test_block:
5557
-
5658
- query: select count(col1) from t1
5759
- explain: "AISCAN(MV3 <,> BY_GROUP -> [_0: VALUE:[0]]) | MAP (_ AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)"
60+
# Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206
61+
- maxRows: 0
5862
- result: [{2}]
5963
-
6064
- query: select count(col1) from t1 group by col2
@@ -81,17 +85,47 @@ test_block:
8185
-
8286
- query: select count(*) from t2
8387
- explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count_star(*) AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)"
88+
# Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206
89+
- maxRows: 0
8490
- result: [{4}]
8591
-
8692
- query: select count(*) from t2 group by col2
93+
# Plan deserialization previously failed : https://github.com/FoundationDB/fdb-record-layer/issues/3214
94+
- supported_version: !current_version
8795
- explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count_star(*) AS _0) GROUP BY (_._0.COL2 AS _0) | MAP (_._1._0 AS _0)"
8896
- result: [{1}, {3}]
97+
-
98+
# Same as above test, but tests serialization upgrades from before !current_version. Can be removed once we no longer
99+
# care about upgrading to that version from older versions
100+
- query: select count(*) from t2 group by col2
101+
- maxRows: 1
102+
- initialVersionLessThan: !current_version
103+
- result: [{1}]
104+
- result: [{2}] # Off by one due to: https://github.com/FoundationDB/fdb-record-layer/issues/3097 (also fixed in !current_version)
105+
- error: 'XX000' # plan fails to deserialize on older server
106+
- initialVersionAtLeast: !current_version
107+
# Covered in above test case
89108
-
90109
- query: select count(col1) from t2
91110
- explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count(_._0.COL1) AS _0) | ON EMPTY NULL | MAP (coalesce_long(_._0._0, promote(0l AS LONG)) AS _0)"
111+
# Cannot run with FORCE_CONTINUATIONS due to: https://github.com/FoundationDB/fdb-record-layer/issues/3206
112+
- maxRows: 0
92113
- result: [{2}]
93114
-
94115
- query: select count(col1) from t2 group by col2
116+
# Plan deserialization previously failed : https://github.com/FoundationDB/fdb-record-layer/issues/3214
117+
- supported_version: !current_version
95118
- explain: "ISCAN(MV5 <,>) | MAP (_ AS _0) | AGG (count(_._0.COL1) AS _0) GROUP BY (_._0.COL2 AS _0) | MAP (_._1._0 AS _0)"
96119
- result: [{1}, {1}]
120+
-
121+
# Same as above test, but tests serialization upgrades from before !current_version. Can be removed once we no longer
122+
# care about upgrading to that version from older versions
123+
- query: select count(col1) from t2 group by col2
124+
- maxRows: 1
125+
- initialVersionLessThan: !current_version
126+
- result: [{1}]
127+
- result: [{1}]
128+
- error: 'XX000' # plan fails to deserialize on older server
129+
- initialVersionAtLeast: !current_version
130+
# Covered in above test case
97131
...

0 commit comments

Comments
 (0)