@@ -45,6 +45,7 @@ public class SSEStream<T extends org.stellar.sdk.responses.Response> implements
45
45
private EventSource eventSource = null ;
46
46
private final Lock lock = new ReentrantLock ();
47
47
private final long reconnectTimeout ;
48
+ private final AtomicLong currentListenerId = new AtomicLong (0 );
48
49
49
50
private SSEStream (
50
51
final OkHttpClient okHttpClient ,
@@ -83,6 +84,7 @@ public void run() {
83
84
}
84
85
85
86
try {
87
+ // TODO: use Executors.newSingleThreadScheduledExecutor() instead
86
88
Thread .sleep (200 );
87
89
if (serverSideClosed .get () || clientSideClosed .get ()) {
88
90
// don't restart until true again
@@ -116,6 +118,7 @@ private void restart() {
116
118
if (eventSource != null ) {
117
119
eventSource .cancel ();
118
120
}
121
+ long newListenerId = currentListenerId .incrementAndGet ();
119
122
eventSource =
120
123
doStreamRequest (
121
124
this ,
@@ -129,11 +132,13 @@ private void restart() {
129
132
public void closed (EventSource source ) {
130
133
serverSideClosed .set (true );
131
134
}
132
- });
135
+ },
136
+ newListenerId );
133
137
}
134
138
135
139
public void close () {
136
140
isStopped .set (true );
141
+ currentListenerId .incrementAndGet (); // Prevent any future EventSource
137
142
if (eventSource != null ) {
138
143
eventSource .cancel ();
139
144
}
@@ -168,7 +173,8 @@ private static <T extends org.stellar.sdk.responses.Response> EventSource doStre
168
173
final Class <T > responseClass ,
169
174
final EventListener <T > listener ,
170
175
String url ,
171
- final CloseListener closeListener ) {
176
+ final CloseListener closeListener ,
177
+ long listenerId ) {
172
178
173
179
Request .Builder builder =
174
180
new Request .Builder ()
@@ -183,7 +189,7 @@ private static <T extends org.stellar.sdk.responses.Response> EventSource doStre
183
189
new RealEventSource (
184
190
request ,
185
191
new StellarEventSourceListener <T >(
186
- stream , closeListener , responseClass , requestBuilder , listener ));
192
+ stream , closeListener , responseClass , requestBuilder , listener , listenerId ));
187
193
eventSource .connect (okHttpClient );
188
194
return eventSource ;
189
195
}
@@ -200,22 +206,28 @@ private static class StellarEventSourceListener<T extends org.stellar.sdk.respon
200
206
private final Class <T > responseClass ;
201
207
private final RequestBuilder requestBuilder ;
202
208
private final EventListener <T > listener ;
209
+ private final long listenerId ;
203
210
204
211
StellarEventSourceListener (
205
212
SSEStream <T > stream ,
206
213
CloseListener closeListener ,
207
214
Class <T > responseClass ,
208
215
RequestBuilder requestBuilder ,
209
- EventListener <T > listener ) {
216
+ EventListener <T > listener ,
217
+ long listenerId ) {
210
218
this .stream = stream ;
211
219
this .closeListener = closeListener ;
212
220
this .responseClass = responseClass ;
213
221
this .requestBuilder = requestBuilder ;
214
222
this .listener = listener ;
223
+ this .listenerId = listenerId ;
215
224
}
216
225
217
226
@ Override
218
227
public void onClosed (EventSource eventSource ) {
228
+ if (listenerId != stream .currentListenerId .get ()) {
229
+ return ;
230
+ }
219
231
if (closeListener != null ) {
220
232
closeListener .closed (eventSource );
221
233
}
@@ -227,6 +239,9 @@ public void onOpen(EventSource eventSource, Response response) {}
227
239
@ Override
228
240
public void onFailure (
229
241
EventSource eventSource , @ Nullable Throwable t , @ Nullable Response response ) {
242
+ if (listenerId != stream .currentListenerId .get ()) {
243
+ return ;
244
+ }
230
245
Optional <Integer > code = Optional .empty ();
231
246
if (response != null ) {
232
247
code = Optional .of (response .code ());
@@ -248,6 +263,9 @@ public void onFailure(
248
263
@ Override
249
264
public void onEvent (
250
265
EventSource eventSource , @ Nullable String id , @ Nullable String type , String data ) {
266
+ if (listenerId != stream .currentListenerId .get ()) {
267
+ return ;
268
+ }
251
269
// Update the timestamp of the last received event.
252
270
stream .latestEventTime .set (System .currentTimeMillis ());
253
271
0 commit comments