Skip to content

Commit e6b229f

Browse files
committed
Review feedback
1 parent 42dc376 commit e6b229f

File tree

4 files changed

+36
-16
lines changed

4 files changed

+36
-16
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ export class BucketParameterState {
381381
private readonly staticBuckets: Map<string, ResolvedBucket>;
382382
private readonly includeDefaultStreams: boolean;
383383
// Indexed by the client-side id
384-
private readonly explicitStreamSubscriptions: Record<string, util.RequestedStreamSubscription>;
384+
private readonly explicitStreamSubscriptions: util.RequestedStreamSubscription[];
385385
private readonly subscribedStreamNames: Set<string>;
386386
private readonly logger: Logger;
387387
private cachedDynamicBuckets: ResolvedBucket[] | null = null;
@@ -403,16 +403,16 @@ export class BucketParameterState {
403403
this.syncParams = new RequestParameters(tokenPayload, request.parameters ?? {});
404404
this.logger = logger;
405405

406-
const idToStreamSubscription: Record<string, util.RequestedStreamSubscription> = {};
407406
const streamsByName: Record<string, RequestedStream[]> = {};
408407
const subscriptions = request.streams;
408+
const explicitStreamSubscriptions: util.RequestedStreamSubscription[] = subscriptions?.subscriptions ?? [];
409409
if (subscriptions) {
410-
for (const subscription of subscriptions.subscriptions) {
411-
idToStreamSubscription[subscription.client_id] = subscription;
410+
for (let i = 0; i < explicitStreamSubscriptions.length; i++) {
411+
const subscription = explicitStreamSubscriptions[i];
412412

413413
const syncRuleStream: RequestedStream = {
414414
parameters: subscription.parameters ?? {},
415-
opaque_id: subscription.client_id
415+
opaque_id: i
416416
};
417417
if (Object.hasOwn(streamsByName, subscription.stream)) {
418418
streamsByName[subscription.stream].push(syncRuleStream);
@@ -422,7 +422,7 @@ export class BucketParameterState {
422422
}
423423
}
424424
this.includeDefaultStreams = subscriptions?.include_defaults ?? true;
425-
this.explicitStreamSubscriptions = idToStreamSubscription;
425+
this.explicitStreamSubscriptions = explicitStreamSubscriptions;
426426

427427
this.querier = syncRules.getBucketParameterQuerier({
428428
globalParameters: this.syncParams,
@@ -469,7 +469,7 @@ export class BucketParameterState {
469469
const stream = description.definition;
470470
return { default: lookupIndex.get(stream)! };
471471
} else {
472-
return reason.subscription;
472+
return { sub: reason.subscription };
473473
}
474474
})
475475
};

packages/service-core/src/util/protocol-types.ts

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ export const RequestedStreamSubscription = t.object({
2121
* The defined name of the stream as it appears in sync stream definitions.
2222
*/
2323
stream: t.string,
24-
/**
25-
* An opaque textual identifier assigned to this request by the client.
26-
*/
27-
client_id: t.string,
2824
/**
2925
* An optional dictionary of parameters to pass to this specific stream.
3026
*/
@@ -151,14 +147,35 @@ export type StreamingSyncLine =
151147
export type ProtocolOpId = string;
152148

153149
export interface StreamDescription {
150+
/**
151+
* The name of the stream as it appears in the sync configuration.
152+
*/
154153
name: string;
154+
155+
/**
156+
* Whether this stream is subscribed to by default.
157+
*
158+
* For default streams, this field is still `true` if clients have an explicit subscription to the stream.
159+
*/
155160
is_default: boolean;
156161
}
157162

158163
export interface Checkpoint {
159164
last_op_id: ProtocolOpId;
160165
write_checkpoint?: ProtocolOpId;
161166
buckets: CheckpointBucket[];
167+
168+
/**
169+
* All streams that the client is subscribed to.
170+
*
171+
* This field has two purposes:
172+
*
173+
* 1. It allows clients to determine which of their subscriptions actually works. E.g. if a user does
174+
* `db.syncStream('non_existent_stream').subscribe()`, clients don't immediately know that the stream doesn't
175+
* exist. Only after the next `checkpoint` line can they query this field and mark unresolved subscriptions.
176+
*. 2. It allows clients to learn which default streams they have been subscribed to. This is relevant for APIs
177+
* listing all streams on the client-side.
178+
*/
162179
streams: StreamDescription[];
163180
}
164181

@@ -236,10 +253,13 @@ export type BucketDerivedFromDefaultStream = {
236253
/**
237254
* The bucket has been included in a checkpoint because it's part of a stream that a client has explicitly subscribed
238255
* to.
239-
*
240-
* The string is the client id associated with the subscription in {@link RequestedStreamSubscription}.
241256
*/
242-
export type BucketDerivedFromExplicitSubscription = string;
257+
export type BucketDerivedFromExplicitSubscription = {
258+
/**
259+
* The index (into {@link StreamSubscriptionRequest.subscriptions}) of the subscription yielding this bucket.
260+
*/
261+
sub: number;
262+
};
243263

244264
export interface ClientBucketDescription {
245265
/**

packages/sync-rules/src/BucketDescription.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@ export interface ResolvedBucket extends BucketDescription {
4545
inclusion_reasons: BucketInclusionReason[];
4646
}
4747

48-
export type BucketInclusionReason = 'default' | { subscription: string };
48+
export type BucketInclusionReason = 'default' | { subscription: number };

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export interface RequestedStream {
5252
* An opaque id of the stream subscription, used to associate buckets with the stream subscriptions that have caused
5353
* them to be included.
5454
*/
55-
opaque_id: string;
55+
opaque_id: number;
5656
}
5757

5858
export interface GetQuerierOptions {

0 commit comments

Comments
 (0)