11package com .launchdarkly .client ;
22
3+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
34import com .google .gson .Gson ;
45import com .launchdarkly .eventsource .EventHandler ;
56import com .launchdarkly .eventsource .EventSource ;
67import com .launchdarkly .eventsource .MessageEvent ;
8+ import com .launchdarkly .eventsource .ReadyState ;
79import okhttp3 .Headers ;
10+ import org .joda .time .DateTime ;
811import org .slf4j .Logger ;
912import org .slf4j .LoggerFactory ;
1013
1114import java .io .IOException ;
1215import java .net .URI ;
13- import java .util .concurrent .Future ;
16+ import java .util .concurrent .* ;
1417import java .util .concurrent .atomic .AtomicBoolean ;
1518
1619class StreamProcessor implements UpdateProcessor {
@@ -20,12 +23,15 @@ class StreamProcessor implements UpdateProcessor {
2023 private static final String INDIRECT_PUT = "indirect/put" ;
2124 private static final String INDIRECT_PATCH = "indirect/patch" ;
2225 private static final Logger logger = LoggerFactory .getLogger (StreamProcessor .class );
26+ private static final int DEAD_CONNECTION_INTERVAL_SECONDS = 300 ;
2327
2428 private final FeatureStore store ;
2529 private final LDConfig config ;
2630 private final String sdkKey ;
2731 private final FeatureRequestor requestor ;
28- private EventSource es ;
32+ private final ScheduledExecutorService heartbeatDetectorService ;
33+ private volatile DateTime lastHeartbeat ;
34+ private volatile EventSource es ;
2935 private AtomicBoolean initialized = new AtomicBoolean (false );
3036
3137
@@ -34,6 +40,11 @@ class StreamProcessor implements UpdateProcessor {
3440 this .config = config ;
3541 this .sdkKey = sdkKey ;
3642 this .requestor = requestor ;
43+ ThreadFactory threadFactory = new ThreadFactoryBuilder ()
44+ .setNameFormat ("LaunchDarkly-HeartbeatDetector-%d" )
45+ .build ();
46+ this .heartbeatDetectorService = Executors .newSingleThreadScheduledExecutor (threadFactory );
47+ heartbeatDetectorService .scheduleAtFixedRate (new HeartbeatDetector (), 1 , 1 , TimeUnit .MINUTES );
3748 }
3849
3950 @ Override
@@ -50,11 +61,11 @@ public Future<Void> start() {
5061
5162 @ Override
5263 public void onOpen () throws Exception {
53-
5464 }
5565
5666 @ Override
5767 public void onMessage (String name , MessageEvent event ) throws Exception {
68+ lastHeartbeat = DateTime .now ();
5869 Gson gson = new Gson ();
5970 switch (name ) {
6071 case PUT :
@@ -100,6 +111,12 @@ public void onMessage(String name, MessageEvent event) throws Exception {
100111 }
101112 }
102113
114+ @ Override
115+ public void onComment (String comment ) {
116+ logger .debug ("Received a heartbeat" );
117+ lastHeartbeat = DateTime .now ();
118+ }
119+
103120 @ Override
104121 public void onError (Throwable throwable ) {
105122 logger .error ("Encountered EventSource error: " + throwable .getMessage ());
@@ -125,6 +142,14 @@ public void close() throws IOException {
125142 if (store != null ) {
126143 store .close ();
127144 }
145+ if (heartbeatDetectorService != null ) {
146+ heartbeatDetectorService .shutdownNow ();
147+ try {
148+ heartbeatDetectorService .awaitTermination (100 , TimeUnit .MILLISECONDS );
149+ } catch (InterruptedException e ) {
150+ logger .error ("Encountered an exception terminating heartbeat detector: " + e .getMessage ());
151+ }
152+ }
128153 }
129154
130155 @ Override
@@ -171,4 +196,28 @@ int version() {
171196 }
172197
173198 }
199+
200+ private final class HeartbeatDetector implements Runnable {
201+
202+ @ Override
203+ public void run () {
204+ DateTime reconnectThresholdTime = DateTime .now ().minusSeconds (DEAD_CONNECTION_INTERVAL_SECONDS );
205+ // We only want to force the reconnect if the ES connection is open. If not, it's already trying to
206+ // connect anyway, or this processor was shut down
207+ if (lastHeartbeat .isBefore (reconnectThresholdTime ) && es .getState () == ReadyState .OPEN ) {
208+ try {
209+ logger .info ("Stream stopped receiving heartbeats- reconnecting." );
210+ es .close ();
211+ } catch (IOException e ) {
212+ logger .error ("Encountered exception closing stream connection: " + e .getMessage ());
213+ } finally {
214+ if (es .getState () == ReadyState .SHUTDOWN ) {
215+ start ();
216+ } else {
217+ logger .error ("Expected ES to be in state SHUTDOWN, but it's currently in state " + es .getState ().toString ());
218+ }
219+ }
220+ }
221+ }
222+ }
174223}
0 commit comments