Skip to content

Commit abc7d5d

Browse files
authored
Merge pull request #125 from pusher/feature/reconnect
Add Reconnecting logic when you lose connectivity.
2 parents 537bd51 + f32956f commit abc7d5d

File tree

3 files changed

+108
-19
lines changed

3 files changed

+108
-19
lines changed

src/main/java/com/pusher/client/connection/ConnectionState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
* Represents connection states e.g. connected and disconnected.
55
*/
66
public enum ConnectionState {
7-
CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED, ALL
7+
CONNECTING, CONNECTED, DISCONNECTING, DISCONNECTED, RECONNECTING, ALL
88
}

src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import javax.net.ssl.SSLException;
1515

16-
import org.java_websocket.client.WebSocketClient;
1716
import org.java_websocket.handshake.ServerHandshake;
1817
import org.slf4j.Logger;
1918
import org.slf4j.LoggerFactory;
@@ -31,7 +30,9 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
3130
private static final Gson GSON = new Gson();
3231

3332
private static final String INTERNAL_EVENT_PREFIX = "pusher:";
34-
static final String PING_EVENT_SERIALIZED = "{\"event\": \"pusher:ping\"}";
33+
private static final String PING_EVENT_SERIALIZED = "{\"event\": \"pusher:ping\"}";
34+
private static final int MAX_RECONNECTION_ATTEMPTS = 6; //Taken from the Swift lib
35+
private static final int MAX_RECONNECT_GAP_IN_SECONDS = 30;
3536

3637
private final Factory factory;
3738
private final ActivityTimer activityTimer;
@@ -42,6 +43,8 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
4243
private volatile ConnectionState state = ConnectionState.DISCONNECTED;
4344
private WebSocketClientWrapper underlyingConnection;
4445
private String socketId;
46+
private int reconnectAttempts = 0;
47+
4548

4649
public WebSocketConnection(
4750
final String url,
@@ -68,20 +71,24 @@ public void connect() {
6871
@Override
6972
public void run() {
7073
if (state == ConnectionState.DISCONNECTED) {
71-
try {
72-
underlyingConnection = factory
73-
.newWebSocketClientWrapper(webSocketUri, proxy, WebSocketConnection.this);
74-
updateState(ConnectionState.CONNECTING);
75-
underlyingConnection.connect();
76-
}
77-
catch (final SSLException e) {
78-
sendErrorToAllListeners("Error connecting over SSL", null, e);
79-
}
74+
tryConnecting();
8075
}
8176
}
8277
});
8378
}
8479

80+
private void tryConnecting(){
81+
try {
82+
underlyingConnection = factory
83+
.newWebSocketClientWrapper(webSocketUri, proxy, WebSocketConnection.this);
84+
updateState(ConnectionState.CONNECTING);
85+
underlyingConnection.connect();
86+
}
87+
catch (final SSLException e) {
88+
sendErrorToAllListeners("Error connecting over SSL", null, e);
89+
}
90+
}
91+
8592
@Override
8693
public void disconnect() {
8794
factory.queueOnEventThread(new Runnable() {
@@ -185,6 +192,7 @@ private void handleConnectionMessage(final String message) {
185192
socketId = (String)dataMap.get("socket_id");
186193

187194
updateState(ConnectionState.CONNECTED);
195+
reconnectAttempts = 0;
188196
}
189197

190198
@SuppressWarnings("rawtypes")
@@ -251,12 +259,45 @@ public void run() {
251259

252260
@Override
253261
public void onClose(final int code, final String reason, final boolean remote) {
254-
if (state == ConnectionState.DISCONNECTED) {
255-
log.error("Received close from underlying socket when already disconnected. " + "Close code ["
262+
if (state == ConnectionState.DISCONNECTED || state == ConnectionState.RECONNECTING) {
263+
log.error("Received close from underlying socket when already disconnected." + "Close code ["
256264
+ code + "], Reason [" + reason + "], Remote [" + remote + "]");
257265
return;
258266
}
259267

268+
//Reconnection logic
269+
if(state == ConnectionState.CONNECTED || state == ConnectionState.CONNECTING){
270+
271+
if(reconnectAttempts < MAX_RECONNECTION_ATTEMPTS){
272+
tryReconnecting();
273+
}
274+
else{
275+
updateState(ConnectionState.DISCONNECTING);
276+
cancelTimeoutsAndTransitonToDisconnected();
277+
}
278+
return;
279+
}
280+
281+
if (state == ConnectionState.DISCONNECTING){
282+
cancelTimeoutsAndTransitonToDisconnected();
283+
}
284+
}
285+
286+
private void tryReconnecting() {
287+
reconnectAttempts++;
288+
updateState(ConnectionState.RECONNECTING);
289+
long reconnectInterval = Math.min(MAX_RECONNECT_GAP_IN_SECONDS, reconnectAttempts * reconnectAttempts);
290+
291+
factory.getTimers().schedule(new Runnable() {
292+
@Override
293+
public void run() {
294+
underlyingConnection.removeWebSocketListener();
295+
tryConnecting();
296+
}
297+
}, reconnectInterval, TimeUnit.SECONDS);
298+
}
299+
300+
private void cancelTimeoutsAndTransitonToDisconnected() {
260301
activityTimer.cancelTimeouts();
261302

262303
factory.queueOnEventThread(new Runnable() {
@@ -290,7 +331,7 @@ private class ActivityTimer {
290331
private Future<?> pingTimer;
291332
private Future<?> pongTimer;
292333

293-
public ActivityTimer(final long activityTimeout, final long pongTimeout) {
334+
ActivityTimer(final long activityTimeout, final long pongTimeout) {
294335
this.activityTimeout = activityTimeout;
295336
this.pongTimeout = pongTimeout;
296337
}
@@ -299,7 +340,7 @@ public ActivityTimer(final long activityTimeout, final long pongTimeout) {
299340
* On any activity from the server - Cancel pong timeout - Cancel
300341
* currently ping timeout and re-schedule
301342
*/
302-
public synchronized void activity() {
343+
synchronized void activity() {
303344
if (pongTimer != null) {
304345
pongTimer.cancel(true);
305346
}
@@ -320,7 +361,7 @@ public void run() {
320361
/**
321362
* Cancel any pending timeouts, for example because we are disconnected.
322363
*/
323-
public synchronized void cancelTimeouts() {
364+
synchronized void cancelTimeouts() {
324365
if (pingTimer != null) {
325366
pingTimer.cancel(false);
326367
}

src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,22 @@ public void testReceiveUserMessagePassesMessageToChannelManager() {
199199
}
200200

201201
@Test
202-
public void testOnCloseCallbackUpdatesStateToDisconnected() {
202+
public void testOnCloseCallbackUpdatesStateToDisconnectedWhenPreviousStateIsDisconnecting() {
203203
connection.connect();
204204
verify(mockEventListener).onConnectionStateChange(
205205
new ConnectionStateChange(ConnectionState.DISCONNECTED, ConnectionState.CONNECTING));
206206

207+
connection.onMessage(CONN_ESTABLISHED_EVENT);
208+
verify(mockEventListener).onConnectionStateChange(
209+
new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));
210+
211+
connection.disconnect();
212+
verify(mockEventListener).onConnectionStateChange(
213+
new ConnectionStateChange(ConnectionState.CONNECTED, ConnectionState.DISCONNECTING));
214+
207215
connection.onClose(1, "reason", true);
208216
verify(mockEventListener).onConnectionStateChange(
209-
new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.DISCONNECTED));
217+
new ConnectionStateChange(ConnectionState.DISCONNECTING, ConnectionState.DISCONNECTED));
210218
}
211219

212220
@Test
@@ -297,6 +305,46 @@ public void testPongTimeoutResultsInDisconnect() throws InterruptedException {
297305
assertEquals(ConnectionState.DISCONNECTED, connection.getState());
298306
}
299307

308+
@Test
309+
public void stateIsReconnectingAfterOnCloseWithoutTheUserDisconnecting() throws InterruptedException, SSLException {
310+
connection.connect();
311+
connection.onMessage(CONN_ESTABLISHED_EVENT);
312+
313+
connection.onClose(500, "reason", true);
314+
315+
assertEquals(ConnectionState.RECONNECTING, connection.getState());
316+
}
317+
318+
@Test
319+
public void stateIsReconnectingAfterTryingToConnectForTheFirstTime() throws InterruptedException, SSLException {
320+
connection.connect();
321+
322+
connection.onClose(500, "reason", true);
323+
324+
assertEquals(ConnectionState.RECONNECTING, connection.getState());
325+
}
326+
327+
// TODO: leaving the following tests commented out just for reference. The lib needs to be rearchitected before we can hope to get any of these in
328+
// @Test
329+
// public void reconnectingLogicActuallyBeingCalled(){
330+
// fail("not implemented");
331+
// }
332+
//
333+
// @Test
334+
// public void retryMaximumNumberOfTimes(){
335+
// fail("not implemented");
336+
// }
337+
//
338+
// @Test
339+
// public void disconnectAfterTooManyRetries(){
340+
// fail("not implemented");
341+
// }
342+
//
343+
// @Test
344+
// public void retryWithTimeout(){
345+
// fail("not implemented");
346+
// }
347+
300348
/* end of tests */
301349

302350
private void connect() {

0 commit comments

Comments
 (0)