-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathSyncManagerImp.java
More file actions
246 lines (232 loc) · 12.4 KB
/
SyncManagerImp.java
File metadata and controls
246 lines (232 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package io.split.engine.common;
import com.google.common.annotations.VisibleForTesting;
import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.storages.SegmentCacheProducer;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.StreamingEvent;
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.split.client.utils.SplitExecutorFactory.buildExecutorService;
public class SyncManagerImp implements SyncManager {
private static final Logger _log = LoggerFactory.getLogger(SyncManagerImp.class);
private final AtomicBoolean _streamingEnabledConfig;
private final Synchronizer _synchronizer;
private final PushManager _pushManager;
private final AtomicBoolean _shuttedDown;
private final LinkedBlockingQueue<PushManager.Status> _incomingPushStatus;
private final ExecutorService _pushMonitorExecutorService;
private final ExecutorService _initializationtExecutorService;
private final SDKReadinessGates _gates;
private Future<?> _pushStatusMonitorTask;
private Backoff _backoff;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final TelemetrySynchronizer _telemetrySynchronizer;
private final SplitClientConfig _config;
private final long _startingSyncCallBackoffBaseMs;
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
private final SplitSynchronizationTask _splitSynchronizationTask;
private static final long STARTING_SYNC_ALL_BACKOFF_MAX_WAIT_MS = 10000; // 10 seconds max wait
private final SplitAPI _splitAPI;
@VisibleForTesting
/* package private */ SyncManagerImp(SplitTasks splitTasks,
boolean streamingEnabledConfig,
Synchronizer synchronizer,
PushManager pushManager,
LinkedBlockingQueue<PushManager.Status> pushMessages,
SDKReadinessGates gates,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitAPI splitAPI) {
_streamingEnabledConfig = new AtomicBoolean(streamingEnabledConfig);
_synchronizer = checkNotNull(synchronizer);
_pushManager = checkNotNull(pushManager);
_shuttedDown = new AtomicBoolean(false);
_incomingPushStatus = pushMessages;
_pushMonitorExecutorService = buildExecutorService(config.getThreadFactory(), "SPLIT-PushStatusMonitor-%d");
_initializationtExecutorService = buildExecutorService(config.getThreadFactory(), "SPLIT-Initialization-%d");
_backoff = new Backoff(config.authRetryBackoffBase());
_gates = checkNotNull(gates);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_telemetrySynchronizer = checkNotNull(telemetrySynchronizer);
_config = checkNotNull(config);
_startingSyncCallBackoffBaseMs = config.startingSyncCallBackoffBaseMs();
_segmentSynchronizationTaskImp = checkNotNull(splitTasks.getSegmentSynchronizationTask());
_splitSynchronizationTask = checkNotNull(splitTasks.getSplitSynchronizationTask());
_splitAPI = splitAPI;
}
public static SyncManagerImp build(SplitTasks splitTasks,
SplitFetcher splitFetcher,
SplitCacheProducer splitCacheProducer,
SplitAPI splitAPI,
SegmentCacheProducer segmentCacheProducer,
SDKReadinessGates gates,
TelemetryRuntimeProducer telemetryRuntimeProducer,
TelemetrySynchronizer telemetrySynchronizer,
SplitClientConfig config,
SplitParser splitParser,
FlagSetsFilter flagSetsFilter) {
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
Synchronizer synchronizer = new SynchronizerImp(splitTasks,
splitFetcher,
splitCacheProducer,
segmentCacheProducer,
config.streamingRetryDelay(),
config.streamingFetchMaxRetries(),
config.failedAttemptsBeforeLogging(),
config.getSetsFilter());
PushManager pushManager = PushManagerImp.build(synchronizer,
config.streamingServiceURL(),
config.authServiceURL(),
splitAPI,
pushMessages,
telemetryRuntimeProducer,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
flagSetsFilter);
return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
synchronizer,
pushManager,
pushMessages,
gates,
telemetryRuntimeProducer,
telemetrySynchronizer,
config,
splitAPI);
}
@Override
public void start() {
_initializationtExecutorService.submit(() -> {
Backoff startBackoff = new Backoff(_startingSyncCallBackoffBaseMs, STARTING_SYNC_ALL_BACKOFF_MAX_WAIT_MS);
while(!_synchronizer.syncAll()) {
try{
long howLong = startBackoff.interval();
Thread.currentThread().sleep(howLong);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (_shuttedDown.get()) {
return;
}
if (_log.isDebugEnabled()) {
_log.debug("SyncAll Ready");
}
_gates.sdkInternalReady();
if (_streamingEnabledConfig.get()) {
startStreamingMode();
} else {
startPollingMode();
}
_synchronizer.startPeriodicDataRecording();
_telemetrySynchronizer.synchronizeConfig(_config, System.currentTimeMillis(), ApiKeyCounter.getApiKeyCounterInstance().
getFactoryInstances(), new ArrayList<>());
});
}
@Override
public void shutdown() throws IOException {
_log.info("Shutting down SyncManagerImp");
if(_shuttedDown.get()) {
return;
}
_shuttedDown.set(true);
_initializationtExecutorService.shutdownNow();
_synchronizer.stopPeriodicFetching();
if (_streamingEnabledConfig.get()) {
_pushManager.stop();
_pushMonitorExecutorService.shutdownNow();
}
_segmentSynchronizationTaskImp.close();
_log.info("Successful shutdown of segment fetchers");
_splitSynchronizationTask.close();
_log.info("Successful shutdown of splits");
_synchronizer.stopPeriodicDataRecording();
_splitAPI.close();
}
private void startStreamingMode() {
_log.debug("Starting in streaming mode ...");
if (null == _pushStatusMonitorTask) {
_pushStatusMonitorTask = _pushMonitorExecutorService.submit(this::incomingPushStatusHandler);
}
_pushManager.start();
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(),
StreamEventsEnum.SyncModeUpdateValues.STREAMING_EVENT.getValue(), System.currentTimeMillis()));
}
private void startPollingMode() {
_log.debug("Starting in polling mode ...");
_synchronizer.startPeriodicFetching();
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(),
StreamEventsEnum.SyncModeUpdateValues.POLLING_EVENT.getValue(), System.currentTimeMillis()));
}
@VisibleForTesting
/* package private */ void incomingPushStatusHandler() {
while (!Thread.interrupted()) {
try {
PushManager.Status status = _incomingPushStatus.take();
_log.debug(String.format("Streaming status received: %s", status.toString()));
switch (status) {
case STREAMING_READY:
_synchronizer.stopPeriodicFetching();
_synchronizer.syncAll();
_pushManager.startWorkers();
_pushManager.scheduleConnectionReset();
_backoff.reset();
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(),
StreamEventsEnum.StreamingStatusValues.STREAMING_ENABLED.getValue(), System.currentTimeMillis()));
_log.info("Streaming up and running.");
break;
case STREAMING_DOWN:
_log.info("Streaming service temporarily unavailable, working in polling mode.");
_pushManager.stopWorkers();
// if the whole SDK is being shutdown, don't start polling,
// in case the polling threads are not terminated and a graceful shutdown will fail.
if(_shuttedDown.get()) {
break;
}
_synchronizer.startPeriodicFetching();
break;
case STREAMING_BACKOFF:
long howLong = _backoff.interval();
_log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong));
_synchronizer.startPeriodicFetching();
_pushManager.stop();
Thread.sleep(howLong * 1000);
_incomingPushStatus.clear();
_pushManager.start();
break;
case STREAMING_OFF:
_log.info("Unrecoverable error in streaming subsystem. SDK will work in polling-mode and will not retry an SSE connection.");
_pushManager.stop();
_synchronizer.startPeriodicFetching();
if (null != _pushStatusMonitorTask) {
_pushStatusMonitorTask.cancel(false);
}
return; // Stop this task for the rest of the SDK lifetime
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}