1
- import { IPluggableStorageWrapper , IStorageAsyncFactory , IStorageFactoryParams , ITelemetryCacheAsync } from '../types' ;
1
+ import { IPluggableStorageWrapper , IStorageAsync , IStorageAsyncFactory , IStorageFactoryParams , ITelemetryCacheAsync } from '../types' ;
2
2
3
3
import { KeyBuilderSS } from '../KeyBuilderSS' ;
4
4
import { SplitsCachePluggable } from './SplitsCachePluggable' ;
@@ -62,12 +62,11 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
62
62
63
63
const prefix = validatePrefix ( options . prefix ) ;
64
64
65
- function PluggableStorageFactory ( params : IStorageFactoryParams ) {
65
+ function PluggableStorageFactory ( params : IStorageFactoryParams ) : IStorageAsync {
66
66
const { onReadyCb, settings, settings : { log, mode, sync : { impressionsMode } , scheduler : { impressionsQueueSize, eventsQueueSize } } } = params ;
67
67
const metadata = metadataBuilder ( settings ) ;
68
68
const keys = new KeyBuilderSS ( prefix , metadata ) ;
69
69
const wrapper = wrapperAdapter ( log , options . wrapper ) ;
70
- let connectPromise : Promise < void > ;
71
70
72
71
const isSyncronizer = mode === undefined ; // If mode is not defined, the synchronizer is running
73
72
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE ;
@@ -90,6 +89,35 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
90
89
new UniqueKeysCachePluggable ( log , keys . buildUniqueKeysKey ( ) , wrapper ) :
91
90
undefined ;
92
91
92
+ // Connects to wrapper and emits SDK_READY event on main client
93
+ const connectPromise = wrapper . connect ( ) . then ( ( ) => {
94
+ if ( isSyncronizer ) {
95
+ // In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
96
+ return wrapper . get ( keys . buildHashKey ( ) ) . then ( ( hash ) => {
97
+ const currentHash = getStorageHash ( settings ) ;
98
+ if ( hash !== currentHash ) {
99
+ log . info ( LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache' ) ;
100
+ return wrapper . getKeysByPrefix ( `${ keys . prefix } .` ) . then ( storageKeys => {
101
+ return Promise . all ( storageKeys . map ( storageKey => wrapper . del ( storageKey ) ) ) ;
102
+ } ) . then ( ( ) => wrapper . set ( keys . buildHashKey ( ) , currentHash ) ) ;
103
+ }
104
+ } ) . then ( ( ) => {
105
+ onReadyCb ( ) ;
106
+ } ) ;
107
+ } else {
108
+ // Start periodic flush of async storages if not running synchronizer (producer mode)
109
+ if ( impressionCountsCache && ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ) ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ( ) ;
110
+ if ( uniqueKeysCache && ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ) ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ( ) ;
111
+ if ( telemetry && ( telemetry as ITelemetryCacheAsync ) . recordConfig ) ( telemetry as ITelemetryCacheAsync ) . recordConfig ( ) ;
112
+
113
+ onReadyCb ( ) ;
114
+ }
115
+ } ) . catch ( ( e ) => {
116
+ e = e || new Error ( 'Error connecting wrapper' ) ;
117
+ onReadyCb ( e ) ;
118
+ return e ; // Propagate error for shared clients
119
+ } ) ;
120
+
93
121
return {
94
122
splits : new SplitsCachePluggable ( log , keys , wrapper , settings . sync . __splitFiltersValidation ) ,
95
123
segments : new SegmentsCachePluggable ( log , keys , wrapper ) ,
@@ -99,39 +127,6 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
99
127
telemetry,
100
128
uniqueKeys : uniqueKeysCache ,
101
129
102
- init ( ) {
103
- if ( connectPromise ) return connectPromise ;
104
-
105
- // Connects to wrapper and emits SDK_READY event on main client
106
- return connectPromise = wrapper . connect ( ) . then ( ( ) => {
107
- if ( isSyncronizer ) {
108
- // In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
109
- return wrapper . get ( keys . buildHashKey ( ) ) . then ( ( hash ) => {
110
- const currentHash = getStorageHash ( settings ) ;
111
- if ( hash !== currentHash ) {
112
- log . info ( LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache' ) ;
113
- return wrapper . getKeysByPrefix ( `${ keys . prefix } .` ) . then ( storageKeys => {
114
- return Promise . all ( storageKeys . map ( storageKey => wrapper . del ( storageKey ) ) ) ;
115
- } ) . then ( ( ) => wrapper . set ( keys . buildHashKey ( ) , currentHash ) ) ;
116
- }
117
- } ) . then ( ( ) => {
118
- onReadyCb ( ) ;
119
- } ) ;
120
- } else {
121
- // Start periodic flush of async storages if not running synchronizer (producer mode)
122
- if ( impressionCountsCache && ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ) ( impressionCountsCache as ImpressionCountsCachePluggable ) . start ( ) ;
123
- if ( uniqueKeysCache && ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ) ( uniqueKeysCache as UniqueKeysCachePluggable ) . start ( ) ;
124
- if ( telemetry && ( telemetry as ITelemetryCacheAsync ) . recordConfig ) ( telemetry as ITelemetryCacheAsync ) . recordConfig ( ) ;
125
-
126
- onReadyCb ( ) ;
127
- }
128
- } ) . catch ( ( e ) => {
129
- e = e || new Error ( 'Error connecting wrapper' ) ;
130
- onReadyCb ( e ) ;
131
- return e ; // Propagate error for shared clients
132
- } ) ;
133
- } ,
134
-
135
130
// Stop periodic flush and disconnect the underlying storage
136
131
destroy ( ) {
137
132
return Promise . all ( isSyncronizer ? [ ] : [
@@ -141,8 +136,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
141
136
} ,
142
137
143
138
// emits SDK_READY event on shared clients and returns a reference to the storage
144
- shared ( _ : string , onReadyCb : ( error ?: any ) => void ) {
145
- this . init ( ) . then ( onReadyCb ) ;
139
+ shared ( _ , onReadyCb ) {
140
+ connectPromise . then ( onReadyCb ) ;
146
141
147
142
return {
148
143
...this ,
0 commit comments