Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,20 @@ public static PushManagerImp build(Synchronizer synchronizer,

@Override
public void start() {
_log.debug("#1 - Start PushManagerImp");
try {
lock.lock();
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_log.debug("#2 - PushManagerImp connected");
_expirationTime.set(_streamingTokenRefreshRate);
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

_log.debug("#3 - PushManagerImp error");
cleanUpResources();
if (response.isRetry()) {
_log.debug(String.format("Handling retry error response"));
Expand All @@ -125,6 +128,7 @@ public void start() {
_log.debug(String.format("Auth service response is disabled: %s", response.getToken()));
_pushStatusTracker.forcePushDisable();
}
_log.debug("#4 - PushManagerImp error");
} catch (Exception e) {
_log.debug("Exception in PushManager start: " + e.getMessage());
} finally {
Expand All @@ -134,10 +138,11 @@ public void start() {

@Override
public void stop() {
_log.debug("#1 - Stopping PushManagerImp");
try {
lock.lock();
_log.debug("Stopping PushManagerImp");
cleanUpResources();
_log.debug("#2 - Stopped PushManagerImp");
} finally {
lock.unlock();
}
Expand All @@ -147,9 +152,11 @@ public void stop() {
public void scheduleConnectionReset() {
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
_log.debug("Starting scheduleNextTokenRefresh ...");
_log.debug("#1 - Starting scheduleNextTokenRefresh ...");
stop();
_log.debug("#2 - Finished to stop all streaming engine");
start();
_log.debug("#3 - Finished to start streaming connection");
}, _expirationTime.get(), TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -184,11 +191,15 @@ public void stopWorkers() {
}

private void cleanUpResources() {
_log.debug("Starting cleanUpResources - #1")
_eventSourceClient.stop();
_log.debug("cleanUpResources - #2")
stopWorkers();
if (_nextTokenRefreshTask != null) {
_log.debug("Cancel nextTokenRefreshTask");
_nextTokenRefreshTask.cancel(false);
_log.debug("Finished cleanUpResources - #3 - Finished cancel nextTokenRefreshTask")
}
_log.debug("Finished cleanUpResources - #4")
}
}
}