|
26 | 26 | import com.google.api.gax.core.GaxProperties; |
27 | 27 | import com.google.api.gax.grpc.GaxGrpcProperties; |
28 | 28 | import com.google.api.gax.grpc.GrpcCallContext; |
| 29 | +import com.google.api.gax.grpc.GrpcCallSettings; |
| 30 | +import com.google.api.gax.grpc.GrpcStubCallableFactory; |
29 | 31 | import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; |
30 | 32 | import com.google.api.gax.longrunning.OperationFuture; |
31 | 33 | import com.google.api.gax.retrying.ResultRetryAlgorithm; |
|
35 | 37 | import com.google.api.gax.rpc.ApiCallContext; |
36 | 38 | import com.google.api.gax.rpc.ApiClientHeaderProvider; |
37 | 39 | import com.google.api.gax.rpc.ApiException; |
| 40 | +import com.google.api.gax.rpc.ClientContext; |
38 | 41 | import com.google.api.gax.rpc.FixedHeaderProvider; |
39 | 42 | import com.google.api.gax.rpc.HeaderProvider; |
40 | 43 | import com.google.api.gax.rpc.InstantiatingWatchdogProvider; |
|
44 | 47 | import com.google.api.gax.rpc.StatusCode; |
45 | 48 | import com.google.api.gax.rpc.StreamController; |
46 | 49 | import com.google.api.gax.rpc.TransportChannelProvider; |
| 50 | +import com.google.api.gax.rpc.UnaryCallSettings; |
| 51 | +import com.google.api.gax.rpc.UnaryCallable; |
47 | 52 | import com.google.api.gax.rpc.UnavailableException; |
48 | 53 | import com.google.api.gax.rpc.WatchdogProvider; |
49 | 54 | import com.google.api.pathtemplate.PathTemplate; |
|
59 | 64 | import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; |
60 | 65 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; |
61 | 66 | import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; |
| 67 | +import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory; |
62 | 68 | import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub; |
63 | 69 | import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub; |
64 | 70 | import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; |
|
72 | 78 | import com.google.common.base.Preconditions; |
73 | 79 | import com.google.common.collect.ImmutableList; |
74 | 80 | import com.google.common.collect.ImmutableMap; |
| 81 | +import com.google.common.collect.ImmutableSet; |
75 | 82 | import com.google.common.util.concurrent.RateLimiter; |
76 | 83 | import com.google.common.util.concurrent.ThreadFactoryBuilder; |
77 | 84 | import com.google.iam.v1.GetIamPolicyRequest; |
|
157 | 164 | import java.util.LinkedList; |
158 | 165 | import java.util.List; |
159 | 166 | import java.util.Map; |
| 167 | +import java.util.Set; |
160 | 168 | import java.util.concurrent.Callable; |
161 | 169 | import java.util.concurrent.CancellationException; |
162 | 170 | import java.util.concurrent.ConcurrentHashMap; |
@@ -443,7 +451,45 @@ public GapicSpannerRpc(final SpannerOptions options) { |
443 | 451 | .setCredentialsProvider(credentialsProvider) |
444 | 452 | .setStreamWatchdogProvider(watchdogProvider) |
445 | 453 | .build(); |
446 | | - this.databaseAdminStub = GrpcDatabaseAdminStub.create(this.databaseAdminStubSettings); |
| 454 | + |
| 455 | + // Automatically retry RESOURCE_EXHAUSTED for GetOperation if auto-throttling of |
| 456 | + // administrative requests has been set. The GetOperation RPC is called repeatedly by gax |
| 457 | + // while polling long-running operations for their progress and can also cause these errors. |
| 458 | + // The default behavior is not to retry these errors, and this option should normally only be |
| 459 | + // enabled for (integration) testing. |
| 460 | + if (options.isAutoThrottleAdministrativeRequests()) { |
| 461 | + GrpcStubCallableFactory factory = |
| 462 | + new GrpcDatabaseAdminCallableFactory() { |
| 463 | + @Override |
| 464 | + public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCallable( |
| 465 | + GrpcCallSettings<RequestT, ResponseT> grpcCallSettings, |
| 466 | + UnaryCallSettings<RequestT, ResponseT> callSettings, |
| 467 | + ClientContext clientContext) { |
| 468 | + // Make GetOperation retry on RESOURCE_EXHAUSTED to prevent polling operations from |
| 469 | + // failing with an Administrative requests limit exceeded error. |
| 470 | + if (grpcCallSettings |
| 471 | + .getMethodDescriptor() |
| 472 | + .getFullMethodName() |
| 473 | + .equals("google.longrunning.Operations/GetOperation")) { |
| 474 | + Set<StatusCode.Code> codes = |
| 475 | + ImmutableSet.<StatusCode.Code>builderWithExpectedSize( |
| 476 | + callSettings.getRetryableCodes().size() + 1) |
| 477 | + .addAll(callSettings.getRetryableCodes()) |
| 478 | + .add(StatusCode.Code.RESOURCE_EXHAUSTED) |
| 479 | + .build(); |
| 480 | + callSettings = callSettings.toBuilder().setRetryableCodes(codes).build(); |
| 481 | + } |
| 482 | + return super.createUnaryCallable(grpcCallSettings, callSettings, clientContext); |
| 483 | + } |
| 484 | + }; |
| 485 | + this.databaseAdminStub = |
| 486 | + new GrpcDatabaseAdminStubWithCustomCallableFactory( |
| 487 | + databaseAdminStubSettings, |
| 488 | + ClientContext.create(databaseAdminStubSettings), |
| 489 | + factory); |
| 490 | + } else { |
| 491 | + this.databaseAdminStub = GrpcDatabaseAdminStub.create(databaseAdminStubSettings); |
| 492 | + } |
447 | 493 |
|
448 | 494 | // Check whether the SPANNER_EMULATOR_HOST env var has been set, and if so, if the emulator is |
449 | 495 | // actually running. |
@@ -504,9 +550,9 @@ private static void checkEmulatorConnection( |
504 | 550 |
|
505 | 551 | private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS = |
506 | 552 | RetrySettings.newBuilder() |
507 | | - .setInitialRetryDelay(Duration.ofSeconds(2L)) |
508 | | - .setRetryDelayMultiplier(1.5) |
509 | | - .setMaxRetryDelay(Duration.ofSeconds(15L)) |
| 553 | + .setInitialRetryDelay(Duration.ofSeconds(5L)) |
| 554 | + .setRetryDelayMultiplier(2.0) |
| 555 | + .setMaxRetryDelay(Duration.ofSeconds(60L)) |
510 | 556 | .setMaxAttempts(10) |
511 | 557 | .build(); |
512 | 558 |
|
@@ -1021,6 +1067,11 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> call() throws Exception |
1021 | 1067 | throw newSpannerException(e); |
1022 | 1068 | } catch (ExecutionException e) { |
1023 | 1069 | Throwable t = e.getCause(); |
| 1070 | + SpannerException se = SpannerExceptionFactory.asSpannerException(t); |
| 1071 | + if (se instanceof AdminRequestsPerMinuteExceededException) { |
| 1072 | + // Propagate this to trigger a retry. |
| 1073 | + throw se; |
| 1074 | + } |
1024 | 1075 | if (t instanceof AlreadyExistsException) { |
1025 | 1076 | String operationName = |
1026 | 1077 | OPERATION_NAME_TEMPLATE.instantiate( |
|
0 commit comments