Skip to content

Commit bbb890d

Browse files
authored
Add support for user signin and server to user messages (#327)
1 parent 53de835 commit bbb890d

File tree

17 files changed

+707
-250
lines changed

17 files changed

+707
-250
lines changed

src/main/java/com/pusher/client/Pusher.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.pusher.client.connection.ConnectionEventListener;
1919
import com.pusher.client.connection.ConnectionState;
2020
import com.pusher.client.connection.impl.InternalConnection;
21+
import com.pusher.client.user.User;
22+
import com.pusher.client.user.impl.InternalUser;
2123
import com.pusher.client.util.Factory;
2224

2325
/**
@@ -42,6 +44,7 @@ public class Pusher implements Client {
4244
private final InternalConnection connection;
4345
private final ChannelManager channelManager;
4446
private final Factory factory;
47+
private final InternalUser user;
4548

4649
/**
4750
* Creates a new instance of Pusher.
@@ -102,11 +105,17 @@ public Pusher(final String apiKey, final PusherOptions pusherOptions) {
102105

103106
this.pusherOptions = pusherOptions;
104107
this.factory = factory;
105-
connection = factory.getConnection(apiKey, this.pusherOptions);
108+
connection = factory.getConnection(apiKey, this.pusherOptions, this::handleEvent);
106109
channelManager = factory.getChannelManager();
110+
user = factory.newUser(connection, pusherOptions.getUserAuthenticator());
107111
channelManager.setConnection(connection);
108112
}
109113

114+
private void handleEvent(String event, String wholeMessage) {
115+
user.handleEvent(event, wholeMessage);
116+
channelManager.onMessage(event, wholeMessage);
117+
}
118+
110119
/* Connection methods */
111120

112121
/**
@@ -192,6 +201,28 @@ public void disconnect() {
192201
}
193202
}
194203

204+
/**
205+
*
206+
* @return The {@link com.pusher.client.user.User} associated with this Pusher connection.
207+
*/
208+
public User user() {
209+
return user;
210+
}
211+
212+
/**
213+
* Signs in on the Pusher connection as the current user.
214+
*
215+
* <p>
216+
* Requires {@link PusherOptions#setUserAuthenticator} to have been called.
217+
* </p>
218+
*
219+
* @throws IllegalStateException if no {@link UserAuthenticator} has been set.
220+
*/
221+
public void signin() {
222+
throwExceptionIfNoUserAuthenticatorHasBeenSet();
223+
user.signin();
224+
}
225+
195226
/* Subscription methods */
196227

197228
/**
@@ -378,6 +409,13 @@ private void throwExceptionIfNoChannelAuthorizerHasBeenSet() {
378409
}
379410
}
380411

412+
private void throwExceptionIfNoUserAuthenticatorHasBeenSet() {
413+
if (pusherOptions.getUserAuthenticator() == null) {
414+
throw new IllegalStateException(
415+
"Cannot sign in because no UserAuthenticator has been set. Call PusherOptions.setUserAuthenticator() before connecting to Pusher");
416+
}
417+
}
418+
381419
/**
382420
*
383421
* @param channelName The name of the public channel to be retrieved
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package com.pusher.client.channel.impl;
2+
3+
import java.util.HashMap;
4+
import java.util.HashSet;
5+
import java.util.LinkedHashMap;
6+
import java.util.Map;
7+
import java.util.Set;
8+
9+
import com.google.gson.Gson;
10+
11+
import com.google.gson.GsonBuilder;
12+
import com.pusher.client.channel.*;
13+
import com.pusher.client.channel.impl.message.SubscribeMessage;
14+
import com.pusher.client.channel.impl.message.UnsubscribeMessage;
15+
import com.pusher.client.util.Factory;
16+
17+
public abstract class BaseChannel implements InternalChannel {
18+
protected final Gson GSON;
19+
private static final String INTERNAL_EVENT_PREFIX = "pusher_internal:";
20+
protected static final String SUBSCRIPTION_SUCCESS_EVENT = "pusher_internal:subscription_succeeded";
21+
private Set<SubscriptionEventListener> globalListeners = new HashSet<SubscriptionEventListener>();
22+
private final Map<String, Set<SubscriptionEventListener>> eventNameToListenerMap = new HashMap<String, Set<SubscriptionEventListener>>();
23+
protected volatile ChannelState state = ChannelState.INITIAL;
24+
private ChannelEventListener eventListener;
25+
private final Factory factory;
26+
private final Object lock = new Object();
27+
28+
public BaseChannel(final Factory factory) {
29+
GsonBuilder gsonBuilder = new GsonBuilder();
30+
gsonBuilder.registerTypeAdapter(PusherEvent.class, new PusherEventDeserializer());
31+
GSON = gsonBuilder.create();
32+
this.factory = factory;
33+
}
34+
35+
/* Channel implementation */
36+
37+
@Override
38+
abstract public String getName();
39+
40+
@Override
41+
public void bind(final String eventName, final SubscriptionEventListener listener) {
42+
validateArguments(eventName, listener);
43+
44+
synchronized (lock) {
45+
Set<SubscriptionEventListener> listeners = eventNameToListenerMap.get(eventName);
46+
if (listeners == null) {
47+
listeners = new HashSet<SubscriptionEventListener>();
48+
eventNameToListenerMap.put(eventName, listeners);
49+
}
50+
listeners.add(listener);
51+
}
52+
}
53+
54+
@Override
55+
public void bindGlobal(SubscriptionEventListener listener) {
56+
validateArguments("", listener);
57+
58+
synchronized(lock) {
59+
globalListeners.add(listener);
60+
}
61+
}
62+
63+
@Override
64+
public void unbind(String eventName, SubscriptionEventListener listener) {
65+
validateArguments(eventName, listener);
66+
67+
synchronized (lock) {
68+
final Set<SubscriptionEventListener> listeners = eventNameToListenerMap.get(eventName);
69+
if (listeners != null) {
70+
listeners.remove(listener);
71+
if (listeners.isEmpty()) {
72+
eventNameToListenerMap.remove(eventName);
73+
}
74+
}
75+
}
76+
}
77+
78+
@Override
79+
public void unbindGlobal(SubscriptionEventListener listener) {
80+
validateArguments("", listener);
81+
82+
synchronized(lock) {
83+
if (globalListeners != null) {
84+
globalListeners.remove(listener);
85+
}
86+
}
87+
}
88+
89+
@Override
90+
public boolean isSubscribed() {
91+
return state == ChannelState.SUBSCRIBED;
92+
}
93+
94+
/* InternalChannel implementation */
95+
96+
@Override
97+
public String toSubscribeMessage() {
98+
return GSON.toJson(new SubscribeMessage(getName()));
99+
}
100+
101+
@Override
102+
public String toUnsubscribeMessage() {
103+
return GSON.toJson(new UnsubscribeMessage(getName()));
104+
}
105+
106+
@Override
107+
public PusherEvent prepareEvent(String event, String message) {
108+
return GSON.fromJson(message, PusherEvent.class);
109+
}
110+
111+
@Override
112+
public void onMessage(String event, String message) {
113+
if (event.equals(SUBSCRIPTION_SUCCESS_EVENT)) {
114+
updateState(ChannelState.SUBSCRIBED);
115+
} else {
116+
final Set<SubscriptionEventListener> listeners = getInterestedListeners(event);
117+
if (listeners != null) {
118+
final PusherEvent pusherEvent = prepareEvent(event, message);
119+
if (pusherEvent != null) {
120+
for (final SubscriptionEventListener listener : listeners) {
121+
factory.queueOnEventThread(new Runnable() {
122+
@Override
123+
public void run() {
124+
listener.onEvent(pusherEvent);
125+
}
126+
});
127+
}
128+
}
129+
}
130+
}
131+
}
132+
133+
@Override
134+
public void updateState(ChannelState state) {
135+
this.state = state;
136+
137+
if (state == ChannelState.SUBSCRIBED && eventListener != null) {
138+
factory.queueOnEventThread(new Runnable() {
139+
@Override
140+
public void run() {
141+
eventListener.onSubscriptionSucceeded(getName());
142+
}
143+
});
144+
}
145+
}
146+
147+
@Override
148+
public void setEventListener(final ChannelEventListener listener) {
149+
eventListener = listener;
150+
}
151+
152+
@Override
153+
public ChannelEventListener getEventListener() {
154+
return eventListener;
155+
}
156+
157+
/* Comparable implementation */
158+
159+
@Override
160+
public int compareTo(final InternalChannel other) {
161+
return getName().compareTo(other.getName());
162+
}
163+
164+
/* Implementation detail */
165+
166+
@Override
167+
public String toString() {
168+
return String.format("[Channel: name=%s]", getName());
169+
}
170+
171+
private void validateArguments(final String eventName, final SubscriptionEventListener listener) {
172+
173+
if (eventName == null) {
174+
throw new IllegalArgumentException("Cannot bind or unbind to channel " + getName() + " with a null event name");
175+
}
176+
177+
if (listener == null) {
178+
throw new IllegalArgumentException("Cannot bind or unbind to channel " + getName() + " with a null listener");
179+
}
180+
181+
if (eventName.startsWith(INTERNAL_EVENT_PREFIX)) {
182+
throw new IllegalArgumentException("Cannot bind or unbind channel " + getName()
183+
+ " with an internal event name such as " + eventName);
184+
}
185+
}
186+
187+
protected Set<SubscriptionEventListener> getInterestedListeners(String event) {
188+
synchronized (lock) {
189+
Set<SubscriptionEventListener> listeners = new HashSet<SubscriptionEventListener>();
190+
191+
final Set<SubscriptionEventListener> sharedListeners =
192+
eventNameToListenerMap.get(event);
193+
194+
if (sharedListeners != null ) {
195+
listeners.addAll(sharedListeners);
196+
}
197+
if (!globalListeners.isEmpty()) {
198+
listeners.addAll(globalListeners);
199+
}
200+
201+
if (listeners.isEmpty()){
202+
return null;
203+
}
204+
205+
return listeners;
206+
}
207+
}
208+
}

0 commit comments

Comments
 (0)