Skip to content

Commit c203254

Browse files
author
Luís Fonseca
authored
Merge pull request #118 from pusher/fix-pong-timeout-disconnect
Immediately set connection state to disconnected due to a pong timeout
2 parents 1b6acfa + 44701cd commit c203254

File tree

5 files changed

+71
-18
lines changed

5 files changed

+71
-18
lines changed

src/main/java/com/pusher/client/connection/websocket/WebSocketClientWrapper.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,14 @@
2222
public class WebSocketClientWrapper extends WebSocketClient {
2323

2424
private static final String WSS_SCHEME = "wss";
25-
private final WebSocketListener webSocketListener;
25+
private WebSocketListener webSocketListener;
2626

2727
public WebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
2828
super(uri);
2929

3030
if (uri.getScheme().equals(WSS_SCHEME)) {
3131
try {
32-
SSLContext sslContext = null;
33-
sslContext = SSLContext.getInstance("TLS");
32+
SSLContext sslContext = SSLContext.getInstance("TLS");
3433
sslContext.init(null, null, null); // will use java's default
3534
// key and trust store which
3635
// is sufficient unless you
@@ -58,21 +57,36 @@ public WebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketL
5857

5958
@Override
6059
public void onOpen(final ServerHandshake handshakedata) {
61-
webSocketListener.onOpen(handshakedata);
60+
if (webSocketListener != null) {
61+
webSocketListener.onOpen(handshakedata);
62+
}
6263
}
6364

6465
@Override
6566
public void onMessage(final String message) {
66-
webSocketListener.onMessage(message);
67+
if (webSocketListener != null) {
68+
webSocketListener.onMessage(message);
69+
}
6770
}
6871

6972
@Override
7073
public void onClose(final int code, final String reason, final boolean remote) {
71-
webSocketListener.onClose(code, reason, remote);
74+
if (webSocketListener != null) {
75+
webSocketListener.onClose(code, reason, remote);
76+
}
7277
}
7378

7479
@Override
7580
public void onError(final Exception ex) {
76-
webSocketListener.onError(ex);
81+
if (webSocketListener != null) {
82+
webSocketListener.onError(ex);
83+
}
84+
}
85+
86+
/**
87+
* Removes the WebSocketListener so that the underlying WebSocketClient doesn't expose any listener events.
88+
*/
89+
public void removeWebSocketListener() {
90+
webSocketListener = null;
7791
}
7892
}

src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class WebSocketConnection implements InternalConnection, WebSocketListene
4040
private final Proxy proxy;
4141

4242
private volatile ConnectionState state = ConnectionState.DISCONNECTED;
43-
private WebSocketClient underlyingConnection;
43+
private WebSocketClientWrapper underlyingConnection;
4444
private String socketId;
4545

4646
public WebSocketConnection(
@@ -251,18 +251,18 @@ public void run() {
251251

252252
@Override
253253
public void onClose(final int code, final String reason, final boolean remote) {
254+
if (state == ConnectionState.DISCONNECTED) {
255+
log.error("Received close from underlying socket when already disconnected. " + "Close code ["
256+
+ code + "], Reason [" + reason + "], Remote [" + remote + "]");
257+
return;
258+
}
259+
254260
activityTimer.cancelTimeouts();
255261

256262
factory.queueOnEventThread(new Runnable() {
257263
@Override
258264
public void run() {
259-
if (state != ConnectionState.DISCONNECTED) {
260-
updateState(ConnectionState.DISCONNECTED);
261-
}
262-
else {
263-
log.error("Received close from underlying socket when already disconnected. " + "Close code ["
264-
+ code + "], Reason [" + reason + "], Remote [" + remote + "]");
265-
}
265+
updateState(ConnectionState.DISCONNECTED);
266266
factory.shutdownThreads();
267267
}
268268
});
@@ -342,7 +342,15 @@ private synchronized void schedulePongCheck() {
342342
@Override
343343
public void run() {
344344
log.debug("Timed out awaiting pong from server - disconnecting");
345+
346+
underlyingConnection.removeWebSocketListener();
347+
345348
disconnect();
349+
350+
// Proceed immediately to handle the close
351+
// The WebSocketClient will attempt a graceful WebSocket shutdown by exchanging the close frames
352+
// but may not succeed if this disconnect was called due to pong timeout...
353+
onClose(-1, "Pong timeout", false);
346354
}
347355
}, pongTimeout, TimeUnit.MILLISECONDS);
348356
}

src/main/java/com/pusher/client/util/Factory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public synchronized InternalConnection getConnection(final String apiKey, final
6262
return connection;
6363
}
6464

65-
public WebSocketClient newWebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
65+
public WebSocketClientWrapper newWebSocketClientWrapper(final URI uri, final Proxy proxy, final WebSocketListener webSocketListener) throws SSLException {
6666
return new WebSocketClientWrapper(uri, proxy, webSocketListener);
6767
}
6868

src/test/java/com/pusher/client/connection/websocket/WebSocketClientWrapperTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.Test;
1414
import org.junit.runner.RunWith;
1515
import org.mockito.Mock;
16+
import org.mockito.internal.verification.NoMoreInteractions;
1617
import org.mockito.runners.MockitoJUnitRunner;
1718

1819
@RunWith(MockitoJUnitRunner.class)
@@ -52,4 +53,15 @@ public void testOnErrorIsDelegatedToTheListener() {
5253
wrapper.onError(e);
5354
verify(mockListener).onError(e);
5455
}
56+
57+
@Test
58+
public void testRemoveWebSocketListener() {
59+
wrapper.onClose(1, "reason", true);
60+
verify(mockListener).onClose(1, "reason", true);
61+
62+
wrapper.removeWebSocketListener();
63+
64+
wrapper.onClose(1, "reason", true);
65+
verify(mockListener, new NoMoreInteractions()).onClose(1, "reason", true);
66+
}
5567
}

src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.net.Proxy;
99
import java.net.URI;
1010
import java.net.URISyntaxException;
11+
import java.util.concurrent.ScheduledThreadPoolExecutor;
1112

1213
import javax.net.ssl.SSLException;
1314

@@ -29,8 +30,8 @@
2930
@RunWith(MockitoJUnitRunner.class)
3031
public class WebSocketConnectionTest {
3132

32-
private static final long ACTIVITY_TIMEOUT = 120000;
33-
private static final long PONG_TIMEOUT = 30000;
33+
private static final long ACTIVITY_TIMEOUT = 500;
34+
private static final long PONG_TIMEOUT = 500;
3435
private static final String URL = "ws://ws.example.com/";
3536
private static final String EVENT_NAME = "my-event";
3637
private static final String CONN_ESTABLISHED_EVENT = "{\"event\":\"pusher:connection_established\",\"data\":\"{\\\"socket_id\\\":\\\"21112.816204\\\"}\"}";
@@ -278,6 +279,24 @@ public void testDisconnectInDisconnectingStateIsIgnored() {
278279
verify(mockEventListener, times(3)).onConnectionStateChange(any(ConnectionStateChange.class));
279280
}
280281

282+
@Test
283+
public void testPongTimeoutResultsInDisconnect() throws InterruptedException {
284+
when(factory.getTimers()).thenReturn(new ScheduledThreadPoolExecutor(2));
285+
286+
connection.connect();
287+
connection.onMessage(CONN_ESTABLISHED_EVENT);
288+
289+
verify(mockUnderlyingConnection, timeout((int) (ACTIVITY_TIMEOUT + PONG_TIMEOUT))).close();
290+
291+
verify(mockEventListener).onConnectionStateChange(
292+
new ConnectionStateChange(ConnectionState.CONNECTED, ConnectionState.DISCONNECTING));
293+
294+
verify(mockEventListener).onConnectionStateChange(
295+
new ConnectionStateChange(ConnectionState.DISCONNECTING, ConnectionState.DISCONNECTED));
296+
297+
assertEquals(ConnectionState.DISCONNECTED, connection.getState());
298+
}
299+
281300
/* end of tests */
282301

283302
private void connect() {

0 commit comments

Comments
 (0)