Skip to content

Commit 1dbccfa

Browse files
committed
Unit tests for bucket checksum state
1 parent 2e1ecc1 commit 1dbccfa

13 files changed

+379
-104
lines changed

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
5858
return;
5959
}
6060

61-
const syncParams = new RequestParameters(context.token_payload!, params.parameters ?? {});
62-
6361
const {
6462
storageEngine: { activeBucketStorage }
6563
} = service_context;
@@ -95,7 +93,6 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
9593
...params,
9694
binary_data: true // always true for web sockets
9795
},
98-
syncParams,
9996
token: context!.token_payload!,
10097
tokenStreamOptions: {
10198
// RSocket handles keepalive events by default

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ export const syncStreamed = routeDefinition({
5252
});
5353
}
5454

55-
const params: util.StreamingSyncRequest = payload.params;
56-
const syncParams = new RequestParameters(payload.context.token_payload!, payload.params.parameters ?? {});
57-
5855
const bucketStorage = await storageEngine.activeBucketStorage.getActiveStorage();
5956

6057
if (bucketStorage == null) {
@@ -75,8 +72,7 @@ export const syncStreamed = routeDefinition({
7572
syncContext: syncContext,
7673
bucketStorage,
7774
syncRules: syncRules,
78-
params,
79-
syncParams,
75+
params: payload.params,
8076
token: payload.context.token_payload!,
8177
tracker,
8278
signal: controller.signal,

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
BucketSource,
55
BucketSourceType,
66
RequestedStream,
7+
RequestJwtPayload,
78
RequestParameters,
89
ResolvedBucket,
910
SqlSyncRules
@@ -28,10 +29,9 @@ export interface BucketChecksumStateOptions {
2829
syncContext: SyncContext;
2930
bucketStorage: BucketChecksumStateStorage;
3031
syncRules: SqlSyncRules;
31-
syncParams: RequestParameters;
32+
tokenPayload: RequestJwtPayload;
3233
syncRequest: util.StreamingSyncRequest;
3334
logger?: Logger;
34-
initialBucketPositions?: { name: string; after: util.InternalOpId }[];
3535
}
3636

3737
type BucketSyncState = {
@@ -79,14 +79,14 @@ export class BucketChecksumState {
7979
options.syncContext,
8080
options.bucketStorage,
8181
options.syncRules,
82-
options.syncParams,
82+
options.tokenPayload,
8383
options.syncRequest,
8484
this.logger
8585
);
8686
this.bucketDataPositions = new Map();
8787

88-
for (let { name, after: start } of options.initialBucketPositions ?? []) {
89-
this.bucketDataPositions.set(name, { start_op_id: start });
88+
for (let { name, after: start } of options.syncRequest.buckets ?? []) {
89+
this.bucketDataPositions.set(name, { start_op_id: BigInt(start) });
9090
}
9191
}
9292

@@ -199,7 +199,7 @@ export class BucketChecksumState {
199199
}));
200200
bucketsToFetch = [...generateBucketsToFetch].map((b) => {
201201
return {
202-
...bucketDescriptionMap.get(b)!,
202+
priority: bucketDescriptionMap.get(b)!.priority,
203203
bucket: b
204204
};
205205
});
@@ -233,11 +233,11 @@ export class BucketChecksumState {
233233
message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`;
234234
this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length });
235235
};
236-
bucketsToFetch = allBuckets;
236+
bucketsToFetch = allBuckets.map((b) => ({ bucket: b.bucket, priority: b.priority }));
237237

238238
const subscriptions: util.StreamDescription[] = [];
239239
for (const source of this.parameterState.syncRules.bucketSources) {
240-
if (source.type == BucketSourceType.SYNC_STREAM && this.parameterState.isSubscribedToStream(source)) {
240+
if (this.parameterState.isSubscribedToStream(source)) {
241241
subscriptions.push({
242242
name: source.name,
243243
is_default: source.subscribedToByDefault
@@ -360,7 +360,11 @@ export class BucketParameterState {
360360
public readonly syncRules: SqlSyncRules;
361361
public readonly syncParams: RequestParameters;
362362
private readonly querier: BucketParameterQuerier;
363-
private readonly staticBuckets: Map<string, BucketDescription>;
363+
/**
364+
* Static buckets. This map is guaranteed not to change during a request, since resolving static buckets can only
365+
* take request parameters into account,
366+
*/
367+
private readonly staticBuckets: Map<string, ResolvedBucket>;
364368
private readonly includeDefaultStreams: boolean;
365369
// Indexed by the client-side id
366370
private readonly explicitStreamSubscriptions: Record<string, util.RequestedStreamSubscription>;
@@ -375,22 +379,22 @@ export class BucketParameterState {
375379
context: SyncContext,
376380
bucketStorage: BucketChecksumStateStorage,
377381
syncRules: SqlSyncRules,
378-
syncParams: RequestParameters,
382+
tokenPayload: RequestJwtPayload,
379383
request: util.StreamingSyncRequest,
380384
logger: Logger
381385
) {
382386
this.context = context;
383387
this.bucketStorage = bucketStorage;
384388
this.syncRules = syncRules;
385-
this.syncParams = syncParams;
389+
this.syncParams = new RequestParameters(tokenPayload, request.parameters ?? {});
386390
this.logger = logger;
387391

388392
const idToStreamSubscription: Record<string, util.RequestedStreamSubscription> = {};
389393
const streamsByName: Record<string, RequestedStream[]> = {};
390-
const subscriptions = request.subscriptions;
394+
const subscriptions = request.streams;
391395
if (subscriptions) {
392-
for (const subscription of subscriptions.opened) {
393-
idToStreamSubscription[subscription.stream] = subscription;
396+
for (const subscription of subscriptions.subscriptions) {
397+
idToStreamSubscription[subscription.client_id] = subscription;
394398

395399
const syncRuleStream: RequestedStream = {
396400
parameters: subscription.parameters ?? {},
@@ -412,7 +416,7 @@ export class BucketParameterState {
412416
streams: streamsByName
413417
});
414418

415-
this.staticBuckets = new Map<string, BucketDescription>(
419+
this.staticBuckets = new Map<string, ResolvedBucket>(
416420
mergeBuckets(this.querier.staticBuckets).map((b) => [b.bucket, b])
417421
);
418422
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
@@ -441,14 +445,13 @@ export class BucketParameterState {
441445
}
442446

443447
return {
444-
definition: description.definition,
445448
bucket: description.bucket,
446449
priority: priorityOverride ?? description.priority,
447450
subscriptions: description.inclusion_reasons.map((reason) => {
448451
if (reason == 'default') {
449-
return { def: description.definition };
452+
return { default: 0 }; // TODO
450453
} else {
451-
return { sub: reason.subscription };
454+
return reason.subscription;
452455
}
453456
})
454457
};
@@ -489,19 +492,19 @@ export class BucketParameterState {
489492
* For static buckets, we can keep track of which buckets have been updated.
490493
*/
491494
private async getCheckpointUpdateStatic(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
492-
const querier = this.querier;
495+
const staticBuckets = [...this.staticBuckets.values()];
493496
const update = checkpoint.update;
494497

495498
if (update.invalidateDataBuckets) {
496499
return {
497-
buckets: querier.staticBuckets,
500+
buckets: staticBuckets,
498501
updatedBuckets: INVALIDATE_ALL_BUCKETS
499502
};
500503
}
501504

502505
const updatedBuckets = new Set<string>(getIntersection(this.staticBuckets, update.updatedDataBuckets));
503506
return {
504-
buckets: querier.staticBuckets,
507+
buckets: staticBuckets,
505508
updatedBuckets
506509
};
507510
}
@@ -512,7 +515,7 @@ export class BucketParameterState {
512515
private async getCheckpointUpdateDynamic(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
513516
const querier = this.querier;
514517
const storage = this.bucketStorage;
515-
const staticBuckets = querier.staticBuckets;
518+
const staticBuckets = this.staticBuckets.values();
516519
const update = checkpoint.update;
517520

518521
let hasParameterChange = false;
@@ -556,7 +559,7 @@ export class BucketParameterState {
556559
}
557560
}
558561
}
559-
const allBuckets = [...staticBuckets, ...dynamicBuckets];
562+
const allBuckets = [...staticBuckets, ...mergeBuckets(dynamicBuckets)];
560563

561564
if (invalidateDataBuckets) {
562565
return {
@@ -632,15 +635,15 @@ function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number)
632635
* bucket.
633636
*/
634637
function mergeBuckets(buckets: ResolvedBucket[]): ResolvedBucket[] {
635-
const byDefinition: Record<string, ResolvedBucket> = {};
638+
const byBucketId: Record<string, ResolvedBucket> = {};
636639

637640
for (const bucket of buckets) {
638-
if (Object.hasOwn(byDefinition, bucket.definition)) {
639-
byDefinition[bucket.definition].inclusion_reasons.push(...bucket.inclusion_reasons);
641+
if (Object.hasOwn(byBucketId, bucket.bucket)) {
642+
byBucketId[bucket.bucket].inclusion_reasons.push(...bucket.inclusion_reasons);
640643
} else {
641-
byDefinition[bucket.definition] = bucket;
644+
byBucketId[bucket.bucket] = structuredClone(bucket);
642645
}
643646
}
644647

645-
return Object.values(byDefinition);
648+
return Object.values(byBucketId);
646649
}

packages/service-core/src/sync/sync.ts

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
2-
import { BucketDescription, BucketPriority, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules';
2+
import {
3+
BucketDescription,
4+
BucketPriority,
5+
RequestJwtPayload,
6+
RequestParameters,
7+
SqlSyncRules
8+
} from '@powersync/service-sync-rules';
39

410
import { AbortError } from 'ix/aborterror.js';
511

@@ -19,7 +25,6 @@ export interface SyncStreamParameters {
1925
bucketStorage: storage.SyncRulesBucketStorage;
2026
syncRules: SqlSyncRules;
2127
params: util.StreamingSyncRequest;
22-
syncParams: RequestParameters;
2328
token: auth.JwtPayload;
2429
logger?: Logger;
2530
/**
@@ -34,8 +39,7 @@ export interface SyncStreamParameters {
3439
export async function* streamResponse(
3540
options: SyncStreamParameters
3641
): AsyncIterable<util.StreamingSyncLine | string | null> {
37-
const { syncContext, bucketStorage, syncRules, params, syncParams, token, tokenStreamOptions, tracker, signal } =
38-
options;
42+
const { syncContext, bucketStorage, syncRules, params, token, tokenStreamOptions, tracker, signal } = options;
3943
const logger = options.logger ?? defaultLogger;
4044

4145
// We also need to be able to abort, so we create our own controller.
@@ -58,7 +62,7 @@ export async function* streamResponse(
5862
bucketStorage,
5963
syncRules,
6064
params,
61-
syncParams,
65+
token,
6266
tracker,
6367
controller.signal,
6468
logger
@@ -86,25 +90,22 @@ async function* streamResponseInner(
8690
bucketStorage: storage.SyncRulesBucketStorage,
8791
syncRules: SqlSyncRules,
8892
params: util.StreamingSyncRequest,
89-
syncParams: RequestParameters,
93+
tokenPayload: RequestJwtPayload,
9094
tracker: RequestTracker,
9195
signal: AbortSignal,
9296
logger: Logger
9397
): AsyncGenerator<util.StreamingSyncLine | string | null> {
9498
const { raw_data, binary_data } = params;
9599

96-
const checkpointUserId = util.checkpointUserId(syncParams.tokenParameters.user_id as string, params.client_id);
100+
const userId = tokenPayload.sub;
101+
const checkpointUserId = util.checkpointUserId(userId as string, params.client_id);
97102

98103
const checksumState = new BucketChecksumState({
99104
syncContext,
100105
bucketStorage,
101106
syncRules,
102-
syncParams,
107+
tokenPayload,
103108
syncRequest: params,
104-
initialBucketPositions: params.buckets?.map((bucket) => ({
105-
name: bucket.name,
106-
after: BigInt(bucket.after)
107-
})),
108109
logger: logger
109110
});
110111
const stream = bucketStorage.watchCheckpointChanges({
@@ -229,7 +230,7 @@ async function* streamResponseInner(
229230
onRowsSent: markOperationsSent,
230231
abort_connection: signal,
231232
abort_batch: abortCheckpointSignal,
232-
user_id: syncParams.userId,
233+
user_id: userId,
233234
// Passing null here will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
234235
// sync complete message instead.
235236
forPriority: !isLast ? priority : null,

packages/service-core/src/util/protocol-types.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ export const RequestedStreamSubscription = t.object({
2323
stream: t.string,
2424
/**
2525
* An opaque textual identifier assigned to this request by the client.
26-
*
27-
* Wh
2826
*/
2927
client_id: t.string,
3028
/**
@@ -56,7 +54,7 @@ export const StreamSubscriptionRequest = t.object({
5654
/**
5755
* An array of sync streams the client has opened explicitly.
5856
*/
59-
opened: t.array(RequestedStreamSubscription)
57+
subscriptions: t.array(RequestedStreamSubscription)
6058
});
6159

6260
export type StreamSubscriptionRequest = t.Decoded<typeof StreamSubscriptionRequest>;
@@ -100,7 +98,7 @@ export const StreamingSyncRequest = t.object({
10098
/**
10199
* If the client is aware of streams, an array of streams the client has opened.
102100
*/
103-
subscriptions: StreamSubscriptionRequest.optional()
101+
streams: StreamSubscriptionRequest.optional()
104102
});
105103

106104
export type StreamingSyncRequest = t.Decoded<typeof StreamingSyncRequest>;
@@ -227,20 +225,32 @@ export type BucketSubscriptionReason = BucketDerivedFromDefaultStream | BucketDe
227225

228226
/**
229227
* A bucket has been included in a checkpoint because it's part of a default stream.
230-
*
231-
* The string is the name of the stream definition.
232228
*/
233-
export type BucketDerivedFromDefaultStream = { def: string };
229+
export type BucketDerivedFromDefaultStream = {
230+
/**
231+
* The index (into {@link Checkpoint.streams}) of the stream defining the bucket.
232+
*/
233+
default: number;
234+
};
234235

235236
/**
236237
* The bucket has been included in a checkpoint because it's part of a stream that a client has explicitly subscribed
237238
* to.
238239
*
239240
* The string is the client id associated with the subscription in {@link RequestedStreamSubscription}.
240241
*/
241-
export type BucketDerivedFromExplicitSubscription = { sub: string };
242+
export type BucketDerivedFromExplicitSubscription = string;
242243

243-
export interface ClientBucketDescription extends BucketDescription {
244+
export interface ClientBucketDescription {
245+
/**
246+
* An opaque id of the bucket.
247+
*/
248+
bucket: string;
249+
/**
250+
* The priority used to synchronize this bucket, derived from its definition and an optional priority override from
251+
* the stream subscription.
252+
*/
253+
priority: BucketPriority;
244254
subscriptions: BucketSubscriptionReason[];
245255
}
246256

0 commit comments

Comments
 (0)