2222import io .cdap .plugin .salesforce .plugin .OAuthInfo ;
2323import org .awaitility .Awaitility ;
2424import org .awaitility .core .ConditionTimeoutException ;
25+ import org .cometd .bayeux .Channel ;
26+ import org .cometd .bayeux .client .ClientSessionChannel ;
2527import org .cometd .client .BayeuxClient ;
2628import org .cometd .client .transport .ClientTransport ;
2729import org .cometd .client .transport .LongPollingTransport ;
@@ -61,6 +63,7 @@ public class SalesforcePushTopicListener {
6163
6264 private final AuthenticatorCredentials credentials ;
6365 private final String topic ;
66+ private BayeuxClient bayeuxClient ;
6467
6568 private JSONContext .Client jsonContext ;
6669
@@ -75,13 +78,9 @@ public SalesforcePushTopicListener(AuthenticatorCredentials credentials, String
7578 */
7679 public void start () {
7780 try {
78- BayeuxClient bayeuxClient = getClient (credentials );
79- waitForHandshake (bayeuxClient , HANDSHAKE_TIMEOUT_MS , HANDSHAKE_CHECK_INTERVAL_MS );
80- LOG .debug ("Client handshake done" );
81- bayeuxClient .getChannel ("/topic/" + topic ).subscribe ((channel , message ) -> {
82- messagesQueue .add (jsonContext .getGenerator ().generate (message .getDataAsMap ()));
83- });
84-
81+ createSalesforceListener ();
82+ waitForHandshake ();
83+ subscribe ();
8584 } catch (Exception e ) {
8685 throw new RuntimeException ("Could not start client" , e );
8786 }
@@ -92,7 +91,7 @@ public void start() {
9291 * specified wait time if necessary for an element to become available.
9392 *
9493 * @param timeout how long to wait before giving up
95- * @param unit timeunit of timeout
94+ * @param unit timeunit of timeout
9695 * @return the message, or {@code null} if the specified
9796 * waiting time elapses before an element is available
9897 * @throws InterruptedException blocking call is interrupted
@@ -128,21 +127,106 @@ protected void customize(Request exchange) {
128127 };
129128
130129 // Now set up the Bayeux client itself
131- BayeuxClient client = new BayeuxClient (oAuthInfo .getInstanceURL () + DEFAULT_PUSH_ENDPOINT , transport );
132- client .handshake ();
130+ return new BayeuxClient (oAuthInfo .getInstanceURL () + DEFAULT_PUSH_ENDPOINT , transport );
131+ }
132+
133+ public void createSalesforceListener () throws Exception {
134+ bayeuxClient = getClient (credentials );
135+ bayeuxClient .getChannel (Channel .META_HANDSHAKE ).addListener
136+ ((ClientSessionChannel .MessageListener ) (channel , message ) -> {
137+
138+ boolean success = message .isSuccessful ();
139+ if (!success ) {
140+ String error = (String ) message .get ("error" );
141+ if (error != null ) {
142+ throw new RuntimeException (String .format ("Error in meta handshake, errorMessage: %s" , error ));
143+ } else if (message .get ("exception" ) instanceof Exception ) {
144+ Exception exception = (Exception ) message .get ("exception" );
145+ if (exception != null ) {
146+ throw new RuntimeException (String .format ("Exception in meta handshake %s" , exception ));
147+ }
148+ } else {
149+ throw new RuntimeException (String .format ("Error in meta handshake, message: %s" , message ));
150+ }
151+
152+ }
153+ });
154+
155+ bayeuxClient .getChannel (Channel .META_CONNECT ).addListener (
156+ (ClientSessionChannel .MessageListener ) (channel , message ) -> {
157+
158+ boolean success = message .isSuccessful ();
159+ if (!success ) {
160+ String error = (String ) message .get ("error" );
161+ Map <String , Object > advice = message .getAdvice ();
162+
163+ if (error != null ) {
164+ LOG .error ("Error during CONNECT: {}" , error );
165+ LOG .debug ("Advice during CONNECT: {}" , advice );
166+ }
167+ // Error Codes Reference in Salesforce Streaming :
168+ // https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/streaming_error_codes
169+ // .htm
170+ if (advice .get ("reconnect" ).equals ("handshake" )) {
171+ LOG .debug ("Reconnecting to Salesforce Push Topic" );
172+ try {
173+ reconnectToTopic ();
174+ } catch (Exception e ) {
175+ throw new RuntimeException ("Error in reconnecting to salesforce " , e );
176+ }
177+ } else {
178+ throw new RuntimeException (String .format ("Error in meta connect, errorMessage: %s Advice: %s" , error ,
179+ advice ));
180+ }
181+ }
182+ });
183+
184+ bayeuxClient .getChannel (Channel .META_SUBSCRIBE ).addListener (
185+ (ClientSessionChannel .MessageListener ) (channel , message ) -> {
186+
187+ boolean success = message .isSuccessful ();
188+ if (!success ) {
189+ String error = (String ) message .get ("error" );
190+ if (error != null ) {
191+ throw new RuntimeException (String .format ("Error in meta subscribe, errorMessage: %s" , error ));
192+ } else {
193+ throw new RuntimeException (String .format ("Error in meta subscribe, message: %s" , message ));
194+ }
195+ }
196+ });
197+
198+ }
133199
134- return client ;
200+ public void reconnectToTopic () throws Exception {
201+ disconnectStream ();
202+ createSalesforceListener ();
203+ waitForHandshake ();
204+ subscribe ();
135205 }
136206
137- private void waitForHandshake (BayeuxClient client ,
138- long timeoutInMilliseconds , long intervalInMilliseconds ) {
207+ private void waitForHandshake () {
208+ bayeuxClient .handshake ();
209+
139210 try {
140211 Awaitility .await ()
141- .atMost (timeoutInMilliseconds , TimeUnit .MILLISECONDS )
142- .pollInterval (intervalInMilliseconds , TimeUnit .MILLISECONDS )
143- .until (() -> client .isHandshook ());
212+ .atMost (SalesforcePushTopicListener . HANDSHAKE_TIMEOUT_MS , TimeUnit .MILLISECONDS )
213+ .pollInterval (SalesforcePushTopicListener . HANDSHAKE_CHECK_INTERVAL_MS , TimeUnit .MILLISECONDS )
214+ .until (() -> bayeuxClient .isHandshook ());
144215 } catch (ConditionTimeoutException e ) {
145216 throw new IllegalStateException ("Client could not handshake with Salesforce server" , e );
146217 }
218+ LOG .debug ("Client handshake done" );
219+ }
220+
221+ private void subscribe () {
222+ bayeuxClient .getChannel ("/topic/" + topic ).subscribe ((channel , message ) -> {
223+ LOG .debug ("Message : {}" , message );
224+ messagesQueue .add (jsonContext .getGenerator ().generate (message .getDataAsMap ()));
225+ });
226+ }
227+
228+ public void disconnectStream () {
229+ bayeuxClient .getChannel ("/topic/" + topic ).unsubscribe ();
230+ bayeuxClient .disconnect ();
147231 }
148232}
0 commit comments