-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(gax): implement dynamic channel refreshing on 401 retries #13212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,7 @@ class ChannelPool extends ManagedChannel { | |
| private ScheduledFuture<?> resizeFuture = null; | ||
|
|
||
| private final Object entryWriteLock = new Object(); | ||
| private long lastRefreshTimeNanos = 0; | ||
| @VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>(); | ||
| private final AtomicInteger indexTicker = new AtomicInteger(); | ||
| private final String authority; | ||
|
|
@@ -441,6 +442,13 @@ void refresh() { | |
| // - then thread2 will shut down channel that thread1 will put back into circulation (after it | ||
| // replaces the list) | ||
| synchronized (entryWriteLock) { | ||
| long now = System.nanoTime(); | ||
| if (now - lastRefreshTimeNanos < TimeUnit.SECONDS.toNanos(5)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| LOG.fine("Channel pool was refreshed recently, skipping duplicate refresh"); | ||
| return; | ||
| } | ||
| lastRefreshTimeNanos = now; | ||
|
|
||
| LOG.fine("Refreshing all channels"); | ||
| ArrayList<Entry> newEntries = new ArrayList<>(entries.get()); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -111,6 +111,10 @@ public RetryingFuture<ResponseT> createFuture( | |
| */ | ||
| @Override | ||
| public ApiFuture<ResponseT> submit(RetryingFuture<ResponseT> retryingFuture) { | ||
| if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment"))) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| checkForFailedChannelRefresh(retryingFuture); | ||
| } | ||
|
|
||
| try { | ||
| ListenableFuture<ResponseT> attemptFuture = | ||
| scheduler.schedule( | ||
|
|
@@ -122,4 +126,27 @@ public ApiFuture<ResponseT> submit(RetryingFuture<ResponseT> retryingFuture) { | |
| return ApiFutures.immediateFailedFuture(e); | ||
| } | ||
| } | ||
|
|
||
| private void checkForFailedChannelRefresh(RetryingFuture<ResponseT> retryingFuture) { | ||
| ApiFuture<ResponseT> lastAttemptResult = retryingFuture.peekAttemptResult(); | ||
| if (lastAttemptResult != null && lastAttemptResult.isDone()) { | ||
| try { | ||
| lastAttemptResult.get(); | ||
| } catch (java.util.concurrent.ExecutionException e) { | ||
| Throwable cause = e.getCause(); | ||
| if (cause instanceof com.google.api.gax.rpc.UnauthenticatedException) { | ||
| RetryingContext context = retryingFuture.getRetryingContext(); | ||
| if (context instanceof com.google.api.gax.rpc.ApiCallContext) { | ||
| com.google.api.gax.rpc.TransportChannel transportChannel = | ||
| ((com.google.api.gax.rpc.ApiCallContext) context).getTransportChannel(); | ||
|
Comment on lines
+137
to
+141
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| if (transportChannel != null) { | ||
| transportChannel.refresh(); | ||
| } | ||
| } | ||
| } | ||
| } catch (Exception ignored) { | ||
| // Ignore cancellations or interruptions | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,10 @@ class ApiResultRetryAlgorithm<ResponseT> extends BasicResultRetryAlgorithm<Respo | |
| /** Returns true if previousThrowable is an {@link ApiException} that is retryable. */ | ||
| @Override | ||
| public boolean shouldRetry(Throwable previousThrowable, ResponseT previousResponse) { | ||
| if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment")) | ||
| && previousThrowable instanceof UnauthenticatedException) { | ||
| return true; | ||
| } | ||
|
Comment on lines
+41
to
+44
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return (previousThrowable instanceof ApiException) | ||
| && ((ApiException) previousThrowable).isRetryable(); | ||
| } | ||
|
|
@@ -51,6 +55,10 @@ public boolean shouldRetry(Throwable previousThrowable, ResponseT previousRespon | |
| @Override | ||
| public boolean shouldRetry( | ||
| RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) { | ||
| if ("true".equalsIgnoreCase(System.getenv("isMwlidEnvironment")) | ||
| && previousThrowable instanceof UnauthenticatedException) { | ||
| return true; | ||
| } | ||
| if (context.getRetryableCodes() != null) { | ||
| // Ignore the isRetryable() value of the throwable if the RetryingContext has a specific list | ||
| // of codes that should be retried. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initializing
lastRefreshTimeNanosto0can lead to the first refresh being skipped ifSystem.nanoTime()returns a value close to zero (which is possible depending on the JVM's arbitrary time origin). Additionally, the 5-second debounce interval should be defined as a constant.