Skip to content

Commit 8a0273a

Browse files
committed
Include potential errors
1 parent a4a3d8c commit 8a0273a

File tree

11 files changed

+202
-60
lines changed

11 files changed

+202
-60
lines changed

modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 1`] = `
2020
"last_op_id": "2",
2121
"streams": [
2222
{
23+
"errors": [],
2324
"is_default": true,
2425
"name": "mybucket",
2526
},
@@ -131,6 +132,7 @@ exports[`sync - mongodb > expiring token 1`] = `
131132
"last_op_id": "0",
132133
"streams": [
133134
{
135+
"errors": [],
134136
"is_default": true,
135137
"name": "mybucket",
136138
},
@@ -174,6 +176,7 @@ exports[`sync - mongodb > sends checkpoint complete line for empty checkpoint 1`
174176
"last_op_id": "1",
175177
"streams": [
176178
{
179+
"errors": [],
177180
"is_default": true,
178181
"name": "mybucket",
179182
},
@@ -252,10 +255,12 @@ exports[`sync - mongodb > sync buckets in order 1`] = `
252255
"last_op_id": "2",
253256
"streams": [
254257
{
258+
"errors": [],
255259
"is_default": true,
256260
"name": "b0",
257261
},
258262
{
263+
"errors": [],
259264
"is_default": true,
260265
"name": "b1",
261266
},
@@ -335,6 +340,7 @@ exports[`sync - mongodb > sync global data 1`] = `
335340
"last_op_id": "2",
336341
"streams": [
337342
{
343+
"errors": [],
338344
"is_default": true,
339345
"name": "mybucket",
340346
},
@@ -420,14 +426,17 @@ exports[`sync - mongodb > sync interrupts low-priority buckets on new checkpoint
420426
"last_op_id": "4001",
421427
"streams": [
422428
{
429+
"errors": [],
423430
"is_default": true,
424431
"name": "b0a",
425432
},
426433
{
434+
"errors": [],
427435
"is_default": true,
428436
"name": "b0b",
429437
},
430438
{
439+
"errors": [],
431440
"is_default": true,
432441
"name": "b1",
433442
},
@@ -589,6 +598,7 @@ exports[`sync - mongodb > sync legacy non-raw data 1`] = `
589598
"last_op_id": "1",
590599
"streams": [
591600
{
601+
"errors": [],
592602
"is_default": true,
593603
"name": "mybucket",
594604
},
@@ -648,6 +658,7 @@ exports[`sync - mongodb > sync updates to data query only 1`] = `
648658
"last_op_id": "1",
649659
"streams": [
650660
{
661+
"errors": [],
651662
"is_default": true,
652663
"name": "by_user",
653664
},
@@ -732,6 +743,7 @@ exports[`sync - mongodb > sync updates to global data 1`] = `
732743
"last_op_id": "0",
733744
"streams": [
734745
{
746+
"errors": [],
735747
"is_default": true,
736748
"name": "mybucket",
737749
},
@@ -853,6 +865,7 @@ exports[`sync - mongodb > sync updates to parameter query + data 1`] = `
853865
"last_op_id": "0",
854866
"streams": [
855867
{
868+
"errors": [],
856869
"is_default": true,
857870
"name": "by_user",
858871
},
@@ -925,6 +938,7 @@ exports[`sync - mongodb > sync updates to parameter query only 1`] = `
925938
"last_op_id": "0",
926939
"streams": [
927940
{
941+
"errors": [],
928942
"is_default": true,
929943
"name": "by_user",
930944
},

modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 1`] = `
2020
"last_op_id": "2",
2121
"streams": [
2222
{
23+
"errors": [],
2324
"is_default": true,
2425
"name": "mybucket",
2526
},
@@ -131,6 +132,7 @@ exports[`sync - postgres > expiring token 1`] = `
131132
"last_op_id": "0",
132133
"streams": [
133134
{
135+
"errors": [],
134136
"is_default": true,
135137
"name": "mybucket",
136138
},
@@ -174,6 +176,7 @@ exports[`sync - postgres > sends checkpoint complete line for empty checkpoint 1
174176
"last_op_id": "1",
175177
"streams": [
176178
{
179+
"errors": [],
177180
"is_default": true,
178181
"name": "mybucket",
179182
},
@@ -252,10 +255,12 @@ exports[`sync - postgres > sync buckets in order 1`] = `
252255
"last_op_id": "2",
253256
"streams": [
254257
{
258+
"errors": [],
255259
"is_default": true,
256260
"name": "b0",
257261
},
258262
{
263+
"errors": [],
259264
"is_default": true,
260265
"name": "b1",
261266
},
@@ -335,6 +340,7 @@ exports[`sync - postgres > sync global data 1`] = `
335340
"last_op_id": "2",
336341
"streams": [
337342
{
343+
"errors": [],
338344
"is_default": true,
339345
"name": "mybucket",
340346
},
@@ -420,14 +426,17 @@ exports[`sync - postgres > sync interrupts low-priority buckets on new checkpoin
420426
"last_op_id": "4001",
421427
"streams": [
422428
{
429+
"errors": [],
423430
"is_default": true,
424431
"name": "b0a",
425432
},
426433
{
434+
"errors": [],
427435
"is_default": true,
428436
"name": "b0b",
429437
},
430438
{
439+
"errors": [],
431440
"is_default": true,
432441
"name": "b1",
433442
},
@@ -589,6 +598,7 @@ exports[`sync - postgres > sync legacy non-raw data 1`] = `
589598
"last_op_id": "1",
590599
"streams": [
591600
{
601+
"errors": [],
592602
"is_default": true,
593603
"name": "mybucket",
594604
},
@@ -648,6 +658,7 @@ exports[`sync - postgres > sync updates to data query only 1`] = `
648658
"last_op_id": "1",
649659
"streams": [
650660
{
661+
"errors": [],
651662
"is_default": true,
652663
"name": "by_user",
653664
},
@@ -732,6 +743,7 @@ exports[`sync - postgres > sync updates to global data 1`] = `
732743
"last_op_id": "0",
733744
"streams": [
734745
{
746+
"errors": [],
735747
"is_default": true,
736748
"name": "mybucket",
737749
},
@@ -853,6 +865,7 @@ exports[`sync - postgres > sync updates to parameter query + data 1`] = `
853865
"last_op_id": "0",
854866
"streams": [
855867
{
868+
"errors": [],
856869
"is_default": true,
857870
"name": "by_user",
858871
},
@@ -925,6 +938,7 @@ exports[`sync - postgres > sync updates to parameter query only 1`] = `
925938
"last_op_id": "0",
926939
"streams": [
927940
{
941+
"errors": [],
928942
"is_default": true,
929943
"name": "by_user",
930944
},

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ bucket_definitions:
422422

423423
const buckets = await sync_rules
424424
.getBucketParameterQuerier(test_utils.querierOptions(parameters))
425-
.queryDynamicBucketDescriptions({
425+
.querier.queryDynamicBucketDescriptions({
426426
getParameterSets(lookups) {
427427
return checkpoint.getParameterSets(lookups);
428428
}
@@ -498,7 +498,7 @@ bucket_definitions:
498498

499499
const buckets = await sync_rules
500500
.getBucketParameterQuerier(test_utils.querierOptions(parameters))
501-
.queryDynamicBucketDescriptions({
501+
.querier.queryDynamicBucketDescriptions({
502502
getParameterSets(lookups) {
503503
return checkpoint.getParameterSets(lookups);
504504
}
@@ -608,11 +608,13 @@ bucket_definitions:
608608

609609
// Test final values - the important part
610610
const buckets = (
611-
await sync_rules.getBucketParameterQuerier(test_utils.querierOptions(parameters)).queryDynamicBucketDescriptions({
612-
getParameterSets(lookups) {
613-
return checkpoint.getParameterSets(lookups);
614-
}
615-
})
611+
await sync_rules
612+
.getBucketParameterQuerier(test_utils.querierOptions(parameters))
613+
.querier.queryDynamicBucketDescriptions({
614+
getParameterSets(lookups) {
615+
return checkpoint.getParameterSets(lookups);
616+
}
617+
})
616618
).map((e) => e.bucket);
617619
buckets.sort();
618620
expect(buckets).toEqual(['by_workspace["workspace1"]', 'by_workspace["workspace3"]']);

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
logger as defaultLogger
2121
} from '@powersync/lib-services-framework';
2222
import { JSONBig } from '@powersync/service-jsonbig';
23-
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
23+
import { BucketParameterQuerier, QuerierError } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
2424
import { SyncContext } from './SyncContext.js';
2525
import { getIntersection, hasIntersection } from './util.js';
2626

@@ -254,7 +254,12 @@ export class BucketChecksumState {
254254

255255
subscriptions.push({
256256
name: source.name,
257-
is_default: source.subscribedToByDefault
257+
is_default: source.subscribedToByDefault,
258+
errors:
259+
this.parameterState.streamErrors[source.name]?.map((e) => ({
260+
subscription: e.subscription?.opaque_id ?? 'default',
261+
message: e.message
262+
})) ?? []
258263
});
259264
}
260265
}
@@ -382,6 +387,8 @@ export class BucketParameterState {
382387
private readonly includeDefaultStreams: boolean;
383388
// Indexed by the client-side id
384389
private readonly explicitStreamSubscriptions: util.RequestedStreamSubscription[];
390+
// Indexed by descriptor name.
391+
readonly streamErrors: Record<string, QuerierError[]>;
385392
private readonly subscribedStreamNames: Set<string>;
386393
private readonly logger: Logger;
387394
private cachedDynamicBuckets: ResolvedBucket[] | null = null;
@@ -424,11 +431,13 @@ export class BucketParameterState {
424431
this.includeDefaultStreams = subscriptions?.include_defaults ?? true;
425432
this.explicitStreamSubscriptions = explicitStreamSubscriptions;
426433

427-
this.querier = syncRules.getBucketParameterQuerier({
434+
const { querier, errors } = syncRules.getBucketParameterQuerier({
428435
globalParameters: this.syncParams,
429436
hasDefaultStreams: this.includeDefaultStreams,
430437
streams: streamsByName
431438
});
439+
this.querier = querier;
440+
this.streamErrors = Object.groupBy(errors, (e) => e.descriptor) as Record<string, QuerierError[]>;
432441

433442
this.staticBuckets = new Map<string, ResolvedBucket>(
434443
mergeBuckets(this.querier.staticBuckets).map((b) => [b.bucket, b])

packages/service-core/src/util/protocol-types.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ export interface StreamDescription {
158158
* For default streams, this field is still `true` if clients have an explicit subscription to the stream.
159159
*/
160160
is_default: boolean;
161+
162+
/**
163+
* If some subscriptions on this stream could not be resolved, e.g. due to an error, tis
164+
*/
165+
errors: StreamSubscriptionError[];
166+
}
167+
168+
export interface StreamSubscriptionError {
169+
/**
170+
* The subscription that errored - either the default subscription or some of the explicit subscriptions.
171+
*/
172+
subscription: 'default' | number;
173+
/**
174+
* A message describing the error on the subscription.
175+
*/
176+
message: string;
161177
}
162178

163179
export interface Checkpoint {

0 commit comments

Comments
 (0)