Skip to content

Commit 4de7204

Browse files
Segments cache: replace addToSegment, removeFromSegment and setChangeNumber with update method for simplicity
1 parent 6797cfb commit 4de7204

File tree

6 files changed

+63
-29
lines changed

6 files changed

+63
-29
lines changed

src/storages/AbstractSegmentsCacheSync.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export abstract class AbstractSegmentsCacheSync implements ISegmentsCacheSync {
3939
* For client-side synchronizer: the method is not used.
4040
*/
4141
registerSegments(names: string[]): boolean { return false; }
42+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) { return false; }
4243

4344
/**
4445
* For server-side synchronizer: get the list of segments to fetch changes.

src/storages/inMemory/SegmentsCacheInMemory.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ export class SegmentsCacheInMemory extends AbstractSegmentsCacheSync {
1111
private segmentCache: Record<string, ISet<string>> = {};
1212
private segmentChangeNumber: Record<string, number> = {};
1313

14+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) {
15+
const keySet = this.segmentCache[name] || new _Set<string>();
16+
17+
addedKeys.forEach(k => keySet.add(k));
18+
removedKeys.forEach(k => keySet.delete(k));
19+
20+
this.segmentCache[name] = keySet;
21+
this.segmentChangeNumber[name] = changeNumber;
22+
23+
return addedKeys.length > 0 || removedKeys.length > 0;
24+
}
25+
1426
addToSegment(name: string, segmentKeys: string[]): boolean {
1527
const values = this.segmentCache[name];
1628
const keySet = values ? values : new _Set<string>();

src/storages/inRedis/SegmentsCacheInRedis.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,23 @@ export class SegmentsCacheInRedis implements ISegmentsCacheAsync {
1717
this.keys = keys;
1818
}
1919

20+
/**
21+
* Update the given segment `name` with the lists of `addedKeys`, `removedKeys` and `changeNumber`.
22+
* The returned promise is resolved if the operation success, with `true` if the segment was updated (i.e., some key was added or removed),
23+
* or rejected if it fails (e.g., Redis operation fails).
24+
*/
25+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) {
26+
const segmentKey = this.keys.buildSegmentNameKey(name);
27+
28+
return Promise.all([
29+
addedKeys.length && this.redis.sadd(segmentKey, addedKeys),
30+
removedKeys.length && this.redis.srem(segmentKey, removedKeys),
31+
this.redis.set(this.keys.buildSegmentTillKey(name), changeNumber + '')
32+
]).then(() => {
33+
return addedKeys.length > 0 || removedKeys.length > 0;
34+
});
35+
}
36+
2037
addToSegment(name: string, segmentKeys: string[]) {
2138
const segmentKey = this.keys.buildSegmentNameKey(name);
2239

src/storages/pluggable/SegmentsCachePluggable.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@ export class SegmentsCachePluggable implements ISegmentsCacheAsync {
2222
this.wrapper = wrapper;
2323
}
2424

25+
/**
26+
* Update the given segment `name` with the lists of `addedKeys`, `removedKeys` and `changeNumber`.
27+
* The returned promise is resolved if the operation success, with `true` if the segment was updated (i.e., some key was added or removed),
28+
* or rejected if it fails (e.g., wrapper operation fails).
29+
*/
30+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number) {
31+
const segmentKey = this.keys.buildSegmentNameKey(name);
32+
33+
return Promise.all<any>([
34+
addedKeys.length && this.wrapper.addItems(segmentKey, addedKeys),
35+
removedKeys.length && this.wrapper.removeItems(segmentKey, removedKeys),
36+
this.wrapper.set(this.keys.buildSegmentTillKey(name), changeNumber + '')
37+
]).then(() => {
38+
return addedKeys.length > 0 || removedKeys.length > 0;
39+
});
40+
}
41+
2542
/**
2643
* Add a list of `segmentKeys` to the given segment `name`.
2744
* The returned promise is resolved when the operation success

src/storages/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ export interface ISegmentsCacheBase {
258258
getRegisteredSegments(): MaybeThenable<string[]> // only for Server-Side
259259
setChangeNumber(name: string, changeNumber: number): MaybeThenable<boolean | void> // only for Server-Side
260260
getChangeNumber(name: string): MaybeThenable<number> // only for Server-Side
261+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): MaybeThenable<boolean> // only for Server-Side
261262
clear(): MaybeThenable<boolean | void>
262263
}
263264

@@ -271,6 +272,7 @@ export interface ISegmentsCacheSync extends ISegmentsCacheBase {
271272
getKeysCount(): number // only used for telemetry
272273
setChangeNumber(name: string, changeNumber: number): boolean | void
273274
getChangeNumber(name?: string): number
275+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): boolean // only for Server-Side
274276
resetSegments(segmentsData: MySegmentsData | IMySegmentsResponse): boolean // only for Sync Client-Side
275277
clear(): void
276278
}
@@ -283,6 +285,7 @@ export interface ISegmentsCacheAsync extends ISegmentsCacheBase {
283285
getRegisteredSegments(): Promise<string[]>
284286
setChangeNumber(name: string, changeNumber: number): Promise<boolean | void>
285287
getChangeNumber(name: string): Promise<number>
288+
update(name: string, addedKeys: string[], removedKeys: string[], changeNumber: number): Promise<boolean>
286289
clear(): Promise<boolean | void>
287290
}
288291

src/sync/polling/updaters/segmentChangesUpdater.ts

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import { ISegmentChangesFetcher } from '../fetchers/types';
22
import { ISegmentsCacheBase } from '../../../storages/types';
33
import { IReadinessManager } from '../../../readiness/types';
4-
import { MaybeThenable } from '../../../dtos/types';
5-
import { findIndex } from '../../../utils/lang';
64
import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
75
import { ILogger } from '../../../logger/types';
86
import { LOG_PREFIX_INSTANTIATION, LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants';
9-
import { thenable } from '../../../utils/promise/thenable';
107

118
type ISegmentChangesUpdater = (fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) => Promise<boolean>
129

@@ -30,31 +27,22 @@ export function segmentChangesUpdaterFactory(
3027

3128
let readyOnAlreadyExistentState = true;
3229

33-
function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean) {
30+
function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean): Promise<boolean> {
3431
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`);
3532
let sincePromise = Promise.resolve(segments.getChangeNumber(segmentName));
3633

3734
return sincePromise.then(since => {
3835
// if fetchOnlyNew flag, avoid processing already fetched segments
39-
if (fetchOnlyNew && since !== -1) return -1;
40-
41-
return segmentChangesFetcher(since, segmentName, noCache, till).then(function (changes) {
42-
let changeNumber = -1;
43-
const results: MaybeThenable<boolean | void>[] = [];
44-
changes.forEach(x => {
45-
if (x.added.length > 0) results.push(segments.addToSegment(segmentName, x.added));
46-
if (x.removed.length > 0) results.push(segments.removeFromSegment(segmentName, x.removed));
47-
if (x.added.length > 0 || x.removed.length > 0) {
48-
results.push(segments.setChangeNumber(segmentName, x.till));
49-
changeNumber = x.till;
50-
}
51-
52-
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`);
36+
return fetchOnlyNew && since !== -1 ?
37+
false :
38+
segmentChangesFetcher(since, segmentName, noCache, till).then((changes) => {
39+
return Promise.all(changes.map(x => {
40+
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`);
41+
return segments.update(x.name, x.added, x.removed, x.till);
42+
})).then((updates) => {
43+
return updates.some(update => update);
44+
});
5345
});
54-
// If at least one storage operation result is a promise, join all in a single promise.
55-
if (results.some(result => thenable(result))) return Promise.all(results).then(() => changeNumber);
56-
return changeNumber;
57-
});
5846
});
5947
}
6048
/**
@@ -75,16 +63,12 @@ export function segmentChangesUpdaterFactory(
7563
let segmentsPromise = Promise.resolve(segmentName ? [segmentName] : segments.getRegisteredSegments());
7664

7765
return segmentsPromise.then(segmentNames => {
78-
// Async fetchers are collected here.
79-
const updaters: Promise<number>[] = [];
80-
81-
for (let index = 0; index < segmentNames.length; index++) {
82-
updaters.push(updateSegment(segmentNames[index], noCache, till, fetchOnlyNew));
83-
}
66+
// Async fetchers
67+
const updaters = segmentNames.map(segmentName => updateSegment(segmentName, noCache, till, fetchOnlyNew));
8468

8569
return Promise.all(updaters).then(shouldUpdateFlags => {
8670
// if at least one segment fetch succeeded, mark segments ready
87-
if (findIndex(shouldUpdateFlags, v => v !== -1) !== -1 || readyOnAlreadyExistentState) {
71+
if (shouldUpdateFlags.some(update => update) || readyOnAlreadyExistentState) {
8872
readyOnAlreadyExistentState = false;
8973
if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
9074
}

0 commit comments

Comments
 (0)