Skip to content

Commit 44701cd

Browse files
author
Luis Fonseca
committed
Perform an ungraceful disconnect only when server is not responding to pongs
1 parent 7720836 commit 44701cd

File tree

5 files changed

+65
-37
lines changed

5 files changed

+65
-37
lines changed

src/main/java/com/pusher/client/connection/websocket/WebSocketClientWrapper.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222
public class WebSocketClientWrapper extends WebSocketClient {
2323

2424
private static final String WSS_SCHEME = "wss";
25-
private final WebSocketListener webSocketListener;
25+
private WebSocketListener webSocketListener;
2626

2727
public WebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
2828
super(uri);
2929

3030
if (uri.getScheme().equals(WSS_SCHEME)) {
3131
try {
32-
SSLContext sslContext = null;
33-
sslContext = SSLContext.getInstance("TLS");
32+
SSLContext sslContext = SSLContext.getInstance("TLS");
3433
sslContext.init(null, null, null); // will use java's default
3534
// key and trust store which
3635
// is sufficient unless you
@@ -58,21 +57,36 @@ public WebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketL
5857

5958
@Override
6059
public void onOpen(final ServerHandshake handshakedata) {
61-
webSocketListener.onOpen(handshakedata);
60+
if (webSocketListener != null) {
61+
webSocketListener.onOpen(handshakedata);
62+
}
6263
}
6364

6465
@Override
6566
public void onMessage(final String message) {
66-
webSocketListener.onMessage(message);
67+
if (webSocketListener != null) {
68+
webSocketListener.onMessage(message);
69+
}
6770
}
6871

6972
@Override
7073
public void onClose(final int code, final String reason, final boolean remote) {
71-
webSocketListener.onClose(code, reason, remote);
74+
if (webSocketListener != null) {
75+
webSocketListener.onClose(code, reason, remote);
76+
}
7277
}
7378

7479
@Override
7580
public void onError(final Exception ex) {
76-
webSocketListener.onError(ex);
81+
if (webSocketListener != null) {
82+
webSocketListener.onError(ex);
83+
}
84+
}
85+
86+
/**
87+
* Removes the WebSocketListener so that the underlying WebSocketClient doesn't expose any listener events.
88+
*/
89+
public void removeWebSocketListener() {
90+
webSocketListener = null;
7791
}
7892
}

src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
4040
private final Proxy proxy;
4141

4242
private volatile ConnectionState state = ConnectionState.DISCONNECTED;
43-
private WebSocketClient underlyingConnection;
43+
private WebSocketClientWrapper underlyingConnection;
4444
private String socketId;
4545

4646
public WebSocketConnection(
@@ -90,11 +90,6 @@ public void run() {
9090
if (state == ConnectionState.CONNECTED) {
9191
updateState(ConnectionState.DISCONNECTING);
9292
underlyingConnection.close();
93-
94-
// Proceed immediately to handle the close
95-
// The WebSocketClient will attempt a graceful WebSocket shutdown by exchanging the close frames
96-
// but may not succeed if this disconnect was called due to pong timeout...
97-
onClose(-1, "Pong timeout", false);
9893
}
9994
}
10095
});
@@ -256,21 +251,21 @@ public void run() {
256251

257252
@Override
258253
public void onClose(final int code, final String reason, final boolean remote) {
259-
if (state != ConnectionState.DISCONNECTED) {
260-
activityTimer.cancelTimeouts();
261-
262-
factory.queueOnEventThread(new Runnable() {
263-
@Override
264-
public void run() {
265-
updateState(ConnectionState.DISCONNECTED);
266-
factory.shutdownThreads();
267-
}
268-
});
269-
} else {
270-
// Sometimes expected when we disconnected due to pong timeout
254+
if (state == ConnectionState.DISCONNECTED) {
271255
log.error("Received close from underlying socket when already disconnected. " + "Close code ["
272256
+ code + "], Reason [" + reason + "], Remote [" + remote + "]");
257+
return;
273258
}
259+
260+
activityTimer.cancelTimeouts();
261+
262+
factory.queueOnEventThread(new Runnable() {
263+
@Override
264+
public void run() {
265+
updateState(ConnectionState.DISCONNECTED);
266+
factory.shutdownThreads();
267+
}
268+
});
274269
}
275270

276271
@Override
@@ -347,7 +342,15 @@ private synchronized void schedulePongCheck() {
347342
@Override
348343
public void run() {
349344
log.debug("Timed out awaiting pong from server - disconnecting");
345+
346+
underlyingConnection.removeWebSocketListener();
347+
350348
disconnect();
349+
350+
// Proceed immediately to handle the close
351+
// The WebSocketClient will attempt a graceful WebSocket shutdown by exchanging the close frames
352+
// but may not succeed if this disconnect was called due to pong timeout...
353+
onClose(-1, "Pong timeout", false);
351354
}
352355
}, pongTimeout, TimeUnit.MILLISECONDS);
353356
}

src/main/java/com/pusher/client/util/Factory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public synchronized InternalConnection getConnection(final String apiKey, final
6262
return connection;
6363
}
6464

65-
public WebSocketClient newWebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
65+
public WebSocketClientWrapper newWebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
6666
return new WebSocketClientWrapper(uri, proxy, webSocketListener);
6767
}
6868

src/test/java/com/pusher/client/connection/websocket/WebSocketClientWrapperTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.Test;
1414
import org.junit.runner.RunWith;
1515
import org.mockito.Mock;
16+
import org.mockito.internal.verification.NoMoreInteractions;
1617
import org.mockito.runners.MockitoJUnitRunner;
1718

1819
@RunWith(MockitoJUnitRunner.class)
@@ -52,4 +53,15 @@ public void testOnErrorIsDelegatedToTheListener() {
5253
wrapper.onError(e);
5354
verify(mockListener).onError(e);
5455
}
56+
57+
@Test
58+
public void testRemoveWebSocketListener() {
59+
wrapper.onClose(1, "reason", true);
60+
verify(mockListener).onClose(1, "reason", true);
61+
62+
wrapper.removeWebSocketListener();
63+
64+
wrapper.onClose(1, "reason", true);
65+
verify(mockListener, new NoMoreInteractions()).onClose(1, "reason", true);
66+
}
5567
}

src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.net.Proxy;
99
import java.net.URI;
1010
import java.net.URISyntaxException;
11+
import java.util.concurrent.ScheduledThreadPoolExecutor;
1112

1213
import javax.net.ssl.SSLException;
1314

@@ -29,8 +30,8 @@
2930
@RunWith(MockitoJUnitRunner.class)
3031
public class WebSocketConnectionTest {
3132

32-
private static final long ACTIVITY_TIMEOUT = 120000;
33-
private static final long PONG_TIMEOUT = 30000;
33+
private static final long ACTIVITY_TIMEOUT = 500;
34+
private static final long PONG_TIMEOUT = 500;
3435
private static final String URL = "ws://ws.example.com/";
3536
private static final String EVENT_NAME = "my-event";
3637
private static final String CONN_ESTABLISHED_EVENT = "{\"event\":\"pusher:connection_established\",\"data\":\"{\\\"socket_id\\\":\\\"21112.816204\\\"}\"}";
@@ -246,7 +247,7 @@ public void testDisconnectInConnectedStateUpdatesStateToDisconnectingAndNotifies
246247
connection.disconnect();
247248
verify(mockEventListener).onConnectionStateChange(
248249
new ConnectionStateChange(ConnectionState.CONNECTED, ConnectionState.DISCONNECTING));
249-
assertEquals(ConnectionState.DISCONNECTED, connection.getState());
250+
assertEquals(ConnectionState.DISCONNECTING, connection.getState());
250251
}
251252

252253
@Test
@@ -275,20 +276,18 @@ public void testDisconnectInDisconnectingStateIsIgnored() {
275276
connection.disconnect();
276277

277278
verify(mockUnderlyingConnection, times(1)).close();
278-
verify(mockEventListener, times(4)).onConnectionStateChange(any(ConnectionStateChange.class));
279+
verify(mockEventListener, times(3)).onConnectionStateChange(any(ConnectionStateChange.class));
279280
}
280281

281282
@Test
282-
public void testDisconnectImmediatelyCallsOnClose() {
283-
connection.connect();
284-
verify(mockEventListener).onConnectionStateChange(
285-
new ConnectionStateChange(ConnectionState.DISCONNECTED, ConnectionState.CONNECTING));
283+
public void testPongTimeoutResultsInDisconnect() throws InterruptedException {
284+
when(factory.getTimers()).thenReturn(new ScheduledThreadPoolExecutor(2));
286285

286+
connection.connect();
287287
connection.onMessage(CONN_ESTABLISHED_EVENT);
288-
verify(mockEventListener).onConnectionStateChange(
289-
new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));
290288

291-
connection.disconnect();
289+
verify(mockUnderlyingConnection, timeout((int) (ACTIVITY_TIMEOUT + PONG_TIMEOUT))).close();
290+
292291
verify(mockEventListener).onConnectionStateChange(
293292
new ConnectionStateChange(ConnectionState.CONNECTED, ConnectionState.DISCONNECTING));
294293

0 commit comments

Comments
 (0)