Skip to content

Commit 16d5b89

Browse files
committed
Merge pull request #102 from pusher/connect-threadsafely
Synchronize everything going on the event queue. Resolves #98 and #54
2 parents 8c50419 + fd33aab commit 16d5b89

File tree

10 files changed

+91
-117
lines changed

10 files changed

+91
-117
lines changed

src/main/java/com/pusher/client/channel/impl/ChannelImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void onMessage(final String event, final String message) {
111111
for (final SubscriptionEventListener listener : listeners) {
112112
final String data = extractDataFrom(message);
113113

114-
factory.getEventQueue().execute(new Runnable() {
114+
factory.queueOnEventThread(new Runnable() {
115115
@Override
116116
public void run() {
117117
listener.onEvent(name, event, data);
@@ -155,7 +155,7 @@ public void updateState(final ChannelState state) {
155155
this.state = state;
156156

157157
if (state == ChannelState.SUBSCRIBED && eventListener != null) {
158-
factory.getEventQueue().execute(new Runnable() {
158+
factory.queueOnEventThread(new Runnable() {
159159
@Override
160160
public void run() {
161161
eventListener.onSubscriptionSucceeded(ChannelImpl.this.getName());

src/main/java/com/pusher/client/channel/impl/ChannelManager.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void onError(final String message, final String code, final Exception e)
131131

132132
private void sendOrQueueSubscribeMessage(final InternalChannel channel) {
133133

134-
factory.getEventQueue().execute(new Runnable() {
134+
factory.queueOnEventThread(new Runnable() {
135135

136136
@Override
137137
public void run() {
@@ -141,8 +141,7 @@ public void run() {
141141
final String message = channel.toSubscribeMessage();
142142
connection.sendMessage(message);
143143
channel.updateState(ChannelState.SUBSCRIBE_SENT);
144-
}
145-
catch (final AuthorizationFailureException e) {
144+
} catch (final AuthorizationFailureException e) {
146145
clearDownSubscription(channel, e);
147146
}
148147
}
@@ -151,7 +150,7 @@ public void run() {
151150
}
152151

153152
private void sendUnsubscribeMessage(final InternalChannel channel) {
154-
factory.getEventQueue().execute(new Runnable() {
153+
factory.queueOnEventThread(new Runnable() {
155154
@Override
156155
public void run() {
157156
connection.sendMessage(channel.toUnsubscribeMessage());
@@ -166,7 +165,7 @@ private void clearDownSubscription(final InternalChannel channel, final Exceptio
166165
channel.updateState(ChannelState.FAILED);
167166

168167
if (channel.getEventListener() != null) {
169-
factory.getEventQueue().execute(new Runnable() {
168+
factory.queueOnEventThread(new Runnable() {
170169

171170
@Override
172171
public void run() {

src/main/java/com/pusher/client/connection/websocket/WebSocketConnection.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public WebSocketConnection(
6161

6262
@Override
6363
public void connect() {
64-
factory.getEventQueue().execute(new Runnable() {
64+
factory.queueOnEventThread(new Runnable() {
6565

6666
@Override
6767
public void run() {
@@ -82,7 +82,7 @@ public void run() {
8282

8383
@Override
8484
public void disconnect() {
85-
factory.getEventQueue().execute(new Runnable() {
85+
factory.queueOnEventThread(new Runnable() {
8686
@Override
8787
public void run() {
8888
if (state == ConnectionState.CONNECTED) {
@@ -112,7 +112,7 @@ public ConnectionState getState() {
112112

113113
@Override
114114
public void sendMessage(final String message) {
115-
factory.getEventQueue().execute(new Runnable() {
115+
factory.queueOnEventThread(new Runnable() {
116116
@Override
117117
public void run() {
118118
try {
@@ -148,7 +148,7 @@ private void updateState(final ConnectionState newState) {
148148
interestedListeners.addAll(eventListeners.get(newState));
149149

150150
for (final ConnectionEventListener listener : interestedListeners) {
151-
factory.getEventQueue().execute(new Runnable() {
151+
factory.queueOnEventThread(new Runnable() {
152152
@Override
153153
public void run() {
154154
listener.onConnectionStateChange(change);
@@ -216,7 +216,7 @@ private void sendErrorToAllListeners(final String message, final String code, fi
216216
}
217217

218218
for (final ConnectionEventListener listener : allListeners) {
219-
factory.getEventQueue().execute(new Runnable() {
219+
factory.queueOnEventThread(new Runnable() {
220220
@Override
221221
public void run() {
222222
listener.onError(message, code, e);
@@ -237,7 +237,7 @@ public void onOpen(final ServerHandshake handshakedata) {
237237
public void onMessage(final String message) {
238238
activityTimer.activity();
239239

240-
factory.getEventQueue().execute(new Runnable() {
240+
factory.queueOnEventThread(new Runnable() {
241241
@Override
242242
public void run() {
243243
final Map<String, String> map = new Gson().fromJson(message, Map.class);
@@ -251,7 +251,7 @@ public void run() {
251251
public void onClose(final int code, final String reason, final boolean remote) {
252252
activityTimer.cancelTimeouts();
253253

254-
factory.getEventQueue().execute(new Runnable() {
254+
factory.queueOnEventThread(new Runnable() {
255255
@Override
256256
public void run() {
257257
if (state != ConnectionState.DISCONNECTED) {
@@ -268,7 +268,7 @@ public void run() {
268268

269269
@Override
270270
public void onError(final Exception ex) {
271-
factory.getEventQueue().execute(new Runnable() {
271+
factory.queueOnEventThread(new Runnable() {
272272
@Override
273273
public void run() {
274274
// Do not change connection state as Java_WebSocket will also

src/main/java/com/pusher/client/util/Factory.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class Factory {
5050
private ChannelManager channelManager;
5151
private ExecutorService eventQueue;
5252
private ScheduledExecutorService timers;
53+
private static final Object eventLock = new Object();
5354

5455
public synchronized InternalConnection getConnection(final String apiKey, final PusherOptions options) {
5556
if (connection == null) {
@@ -68,13 +69,6 @@ public WebSocketClient newWebSocketClientWrapper(final URI uri, final Proxy prox
6869
return new WebSocketClientWrapper(uri, proxy, webSocketListener);
6970
}
7071

71-
public synchronized ExecutorService getEventQueue() {
72-
if (eventQueue == null) {
73-
eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue"));
74-
}
75-
return eventQueue;
76-
}
77-
7872
public synchronized ScheduledExecutorService getTimers() {
7973
if (timers == null) {
8074
timers = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("timers"));
@@ -103,6 +97,17 @@ public synchronized ChannelManager getChannelManager() {
10397
return channelManager;
10498
}
10599

100+
public void queueOnEventThread(final Runnable r) {
101+
getEventQueue().execute(new Runnable() {
102+
@Override
103+
public void run() {
104+
synchronized (eventLock) {
105+
r.run();
106+
}
107+
}
108+
});
109+
}
110+
106111
public synchronized void shutdownThreads() {
107112
if (eventQueue != null) {
108113
eventQueue.shutdown();
@@ -114,6 +119,13 @@ public synchronized void shutdownThreads() {
114119
}
115120
}
116121

122+
private synchronized ExecutorService getEventQueue() {
123+
if (eventQueue == null) {
124+
eventQueue = Executors.newSingleThreadExecutor(new DaemonThreadFactory("eventQueue"));
125+
}
126+
return eventQueue;
127+
}
128+
117129
private static class DaemonThreadFactory implements ThreadFactory {
118130
private final String name;
119131

src/test/java/com/pusher/client/EndToEndTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.pusher.client.connection.websocket.WebSocketListener;
2727
import com.pusher.client.util.DoNothingExecutor;
2828
import com.pusher.client.util.Factory;
29-
import com.pusher.client.util.InstantExecutor;
3029

3130
@RunWith(MockitoJUnitRunner.class)
3231
public class EndToEndTest {
@@ -56,7 +55,15 @@ public void setUp() throws Exception {
5655

5756
connection = new WebSocketConnection(pusherOptions.buildUrl(API_KEY), ACTIVITY_TIMEOUT, PONG_TIMEOUT, proxy, factory);
5857

59-
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
58+
doAnswer(new Answer() {
59+
@Override
60+
public Object answer(InvocationOnMock invocation) throws Throwable {
61+
final Runnable r = (Runnable) invocation.getArguments()[0];
62+
r.run();
63+
return null;
64+
}
65+
}).when(factory).queueOnEventThread(any(Runnable.class));
66+
6067
when(factory.getTimers()).thenReturn(new DoNothingExecutor());
6168
when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketListener.class))).thenAnswer(
6269
new Answer<WebSocketClientWrapper>() {

src/test/java/com/pusher/client/PusherTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import org.junit.Test;
99
import org.junit.runner.RunWith;
1010
import org.mockito.Mock;
11+
import org.mockito.invocation.InvocationOnMock;
1112
import org.mockito.runners.MockitoJUnitRunner;
13+
import org.mockito.stubbing.Answer;
1214

1315
import com.pusher.client.channel.ChannelEventListener;
1416
import com.pusher.client.channel.PresenceChannelEventListener;
@@ -22,7 +24,6 @@
2224
import com.pusher.client.connection.impl.InternalConnection;
2325
import com.pusher.client.util.Factory;
2426
import com.pusher.client.util.HttpAuthorizer;
25-
import com.pusher.client.util.InstantExecutor;
2627

2728
@RunWith(MockitoJUnitRunner.class)
2829
public class PusherTest {
@@ -58,8 +59,14 @@ public void setUp() {
5859
.thenReturn(mockPrivateChannel);
5960
when(factory.newPresenceChannel(mockConnection, PRESENCE_CHANNEL_NAME, authorizer)).thenReturn(
6061
mockPresenceChannel);
61-
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
62-
62+
doAnswer(new Answer() {
63+
@Override
64+
public Object answer(InvocationOnMock invocation) throws Throwable {
65+
final Runnable r = (Runnable) invocation.getArguments()[0];
66+
r.run();
67+
return null;
68+
}
69+
}).when(factory).queueOnEventThread(any(Runnable.class));
6370
pusher = new Pusher(API_KEY, options, factory);
6471
}
6572

src/test/java/com/pusher/client/channel/impl/ChannelImplTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
import org.junit.Test;
1111
import org.junit.runner.RunWith;
1212
import org.mockito.Mock;
13+
import org.mockito.invocation.InvocationOnMock;
1314
import org.mockito.runners.MockitoJUnitRunner;
15+
import org.mockito.stubbing.Answer;
1416

1517
import com.pusher.client.channel.ChannelEventListener;
1618
import com.pusher.client.channel.ChannelState;
1719
import com.pusher.client.util.Factory;
18-
import com.pusher.client.util.InstantExecutor;
1920

2021
@RunWith(MockitoJUnitRunner.class)
2122
public class ChannelImplTest {
@@ -27,7 +28,14 @@ public class ChannelImplTest {
2728

2829
@Before
2930
public void setUp() {
30-
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
31+
doAnswer(new Answer() {
32+
@Override
33+
public Object answer(InvocationOnMock invocation) throws Throwable {
34+
final Runnable r = (Runnable) invocation.getArguments()[0];
35+
r.run();
36+
return null;
37+
}
38+
}).when(factory).queueOnEventThread(any(Runnable.class));
3139

3240
mockListener = getEventListener();
3341
channel = newInstance(getChannelName());

src/test/java/com/pusher/client/channel/impl/ChannelManagerTest.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import org.junit.runner.RunWith;
99
import static org.junit.Assert.*;
1010
import org.mockito.Mock;
11+
import org.mockito.invocation.InvocationOnMock;
1112
import org.mockito.runners.MockitoJUnitRunner;
13+
import org.mockito.stubbing.Answer;
1214

1315
import com.pusher.client.AuthorizationFailureException;
1416
import com.pusher.client.channel.Channel;
@@ -22,7 +24,6 @@
2224
import com.pusher.client.connection.ConnectionStateChange;
2325
import com.pusher.client.connection.impl.InternalConnection;
2426
import com.pusher.client.util.Factory;
25-
import com.pusher.client.util.InstantExecutor;
2627

2728
@RunWith(MockitoJUnitRunner.class)
2829
public class ChannelManagerTest {
@@ -48,12 +49,18 @@ public class ChannelManagerTest {
4849
private ChannelManager subscriptionTestChannelManager;
4950
private @Mock Factory subscriptionTestFactory;
5051
private @Mock InternalConnection subscriptionTestConnection;
51-
private @Mock InstantExecutor mockQueue;
5252

5353
@Before
5454
public void setUp() throws AuthorizationFailureException {
5555

56-
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
56+
doAnswer(new Answer() {
57+
@Override
58+
public Object answer(InvocationOnMock invocation) throws Throwable {
59+
final Runnable r = (Runnable) invocation.getArguments()[0];
60+
r.run();
61+
return null;
62+
}
63+
}).when(factory).queueOnEventThread(any(Runnable.class));
5764
when(mockInternalChannel.getName()).thenReturn(CHANNEL_NAME);
5865
when(mockInternalChannel.toSubscribeMessage()).thenReturn(OUTGOING_SUBSCRIBE_MESSAGE);
5966
when(mockInternalChannel.toUnsubscribeMessage()).thenReturn(OUTGOING_UNSUBSCRIBE_MESSAGE);
@@ -71,7 +78,14 @@ public void setUp() throws AuthorizationFailureException {
7178
channelManager.setConnection(mockConnection);
7279

7380

74-
when(subscriptionTestFactory.getEventQueue()).thenReturn(mockQueue);
81+
doAnswer(new Answer() {
82+
@Override
83+
public Object answer(InvocationOnMock invocation) throws Throwable {
84+
final Runnable r = (Runnable) invocation.getArguments()[0];
85+
r.run();
86+
return null;
87+
}
88+
}).when(subscriptionTestFactory).queueOnEventThread(any(Runnable.class));
7589
subscriptionTestChannelManager = new ChannelManager(subscriptionTestFactory);
7690
subscriptionTestChannelManager.setConnection(subscriptionTestConnection);
7791

@@ -265,7 +279,7 @@ public void testUnsubscribeFromSubscribedChannelUnsubscribesInEventQueue() {
265279
subscriptionTestChannelManager.subscribeTo(mockInternalChannel, mockEventListener);
266280
subscriptionTestChannelManager.unsubscribeFrom(CHANNEL_NAME);
267281

268-
verify(mockQueue).execute(any(Runnable.class));
282+
verify(subscriptionTestFactory).queueOnEventThread(any(Runnable.class));
269283
}
270284

271285
@Test

src/test/java/com/pusher/client/connection/websocket/WebSocketConnectionTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
import org.junit.Test;
1616
import org.junit.runner.RunWith;
1717
import org.mockito.Mock;
18+
import org.mockito.invocation.InvocationOnMock;
1819
import org.mockito.runners.MockitoJUnitRunner;
20+
import org.mockito.stubbing.Answer;
1921

2022
import com.pusher.client.channel.impl.ChannelManager;
2123
import com.pusher.client.connection.ConnectionEventListener;
2224
import com.pusher.client.connection.ConnectionState;
2325
import com.pusher.client.connection.ConnectionStateChange;
2426
import com.pusher.client.util.DoNothingExecutor;
2527
import com.pusher.client.util.Factory;
26-
import com.pusher.client.util.InstantExecutor;
2728

2829
@RunWith(MockitoJUnitRunner.class)
2930
public class WebSocketConnectionTest {
@@ -53,7 +54,14 @@ public void setUp() throws URISyntaxException, SSLException {
5354
when(factory.getChannelManager()).thenReturn(mockChannelManager);
5455
when(factory.newWebSocketClientWrapper(any(URI.class), any(Proxy.class), any(WebSocketConnection.class))).thenReturn(
5556
mockUnderlyingConnection);
56-
when(factory.getEventQueue()).thenReturn(new InstantExecutor());
57+
doAnswer(new Answer() {
58+
@Override
59+
public Object answer(InvocationOnMock invocation) throws Throwable {
60+
final Runnable r = (Runnable) invocation.getArguments()[0];
61+
r.run();
62+
return null;
63+
}
64+
}).when(factory).queueOnEventThread(any(Runnable.class));
5765
when(factory.getTimers()).thenReturn(new DoNothingExecutor());
5866

5967
connection = new WebSocketConnection(URL, ACTIVITY_TIMEOUT, PONG_TIMEOUT, PROXY, factory);

0 commit comments

Comments
 (0)