diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 11e83add51..da1b6d6eda 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.time.Instant; import java.util.BitSet; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -262,6 +263,9 @@ public void close() { */ private static final Map CHANNEL_USAGE = new HashMap<>(); + private static final EnumSet RETRYABLE_ERROR_CODES = + EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); + private final BitSet channelUsage; private final int numChannels; @@ -358,11 +362,19 @@ public void close() { SettableApiFuture.create(); this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create(); this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); + asyncCreateMultiplexedSession(initialSessionReferenceFuture); + maybeWaitForSessionCreation( + sessionClient.getSpanner().getOptions().getSessionPoolOptions(), + initialSessionReferenceFuture); + } + + private void asyncCreateMultiplexedSession( + SettableApiFuture sessionReferenceFuture) { this.sessionClient.asyncCreateMultiplexedSession( new SessionConsumer() { @Override public void onSessionReady(SessionImpl session) { - initialSessionReferenceFuture.set(session.getSessionReference()); + sessionReferenceFuture.set(session.getSessionReference()); // only start the maintainer if we actually managed to create a session in the first // place. maintainer.start(); @@ -395,33 +407,62 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount // Mark multiplexes sessions as unimplemented and fall back to regular sessions if // UNIMPLEMENTED is returned. maybeMarkUnimplemented(t); - initialSessionReferenceFuture.setException(t); + sessionReferenceFuture.setException(t); } }); - maybeWaitForSessionCreation( - sessionClient.getSpanner().getOptions().getSessionPoolOptions(), - initialSessionReferenceFuture); } void setPool(SessionPool pool) { this.pool = pool; } - private static void maybeWaitForSessionCreation( - SessionPoolOptions sessionPoolOptions, ApiFuture future) { + private void maybeWaitForSessionCreation( + SessionPoolOptions sessionPoolOptions, + SettableApiFuture initialSessionReferenceFuture) { Duration waitDuration = sessionPoolOptions.getWaitForMinSessions(); if (waitDuration != null && !waitDuration.isZero()) { - long timeoutMillis = waitDuration.toMillis(); - try { - future.get(timeoutMillis, TimeUnit.MILLISECONDS); - } catch (ExecutionException executionException) { - throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); - } catch (InterruptedException interruptedException) { - throw SpannerExceptionFactory.propagateInterrupt(interruptedException); - } catch (TimeoutException timeoutException) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.DEADLINE_EXCEEDED, - "Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation"); + + SpannerException lastException = null; + SettableApiFuture sessionReferenceFuture = initialSessionReferenceFuture; + Duration remainingTime; + + Instant endTime = Instant.now().plus(waitDuration); + while ((remainingTime = Duration.between(Instant.now(), endTime)).toMillis() > 0) { + // If any exception is thrown, then retry the multiplexed session creation + if (sessionReferenceFuture == null) { + sessionReferenceFuture = SettableApiFuture.create(); + asyncCreateMultiplexedSession(sessionReferenceFuture); + this.multiplexedSessionReference.set(sessionReferenceFuture); + } + try { + sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS); + lastException = null; + break; + } catch (ExecutionException executionException) { + lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause()); + } catch (InterruptedException interruptedException) { + lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException); + } catch (TimeoutException timeoutException) { + lastException = + SpannerExceptionFactory.newSpannerException( + ErrorCode.DEADLINE_EXCEEDED, + "Timed out after waiting " + + waitDuration.toMillis() + + "ms for multiplexed session creation"); + } + // if any exception is thrown, then set the session reference to null to retry the + // multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or + // RESOURCE_EXHAUSTED + if (RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) { + sessionReferenceFuture = null; + } else { + break; + } + } + // if the wait time elapsed and multiplexed session fetch failed then throw the last exception + // that we have received + if (lastException != null) { + throw lastException; } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index a47aecdccc..d495a6ff63 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -542,16 +542,16 @@ void simulateExecutionTime( boolean stickyGlobalExceptions, CountDownLatch freezeLock) { Uninterruptibles.awaitUninterruptibly(freezeLock); - checkException(globalExceptions, stickyGlobalExceptions); - if (streamIndices.isEmpty()) { - checkException(this.exceptions, stickyException); - } if (minimumExecutionTime > 0 || randomExecutionTime > 0) { Uninterruptibles.sleepUninterruptibly( (randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime)) + minimumExecutionTime, TimeUnit.MILLISECONDS); } + checkException(globalExceptions, stickyGlobalExceptions); + if (streamIndices.isEmpty()) { + checkException(this.exceptions, stickyException); + } } private static void checkException(Queue exceptions, boolean keepException) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 0448656475..ee8c286da5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -210,6 +210,271 @@ public void testMaintainerMaintainsMultipleClients() { } } + @Test + public void testRetryWithTheSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofSeconds(1)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(3, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, requests.size()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + + testSpanner.close(); + } + + @Test + public void testRetryWithTheDatabaseNotFoundExceptionWithSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Collections.singletonList( + Status.NOT_FOUND.withDescription("Database not found.").asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + assertThrows( + SpannerException.class, () -> testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d"))); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(1, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithNoSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofExceptions( + Collections.singletonList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + SpannerException spannerException = + assertThrows( + SpannerException.class, + () -> { + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + }); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(1, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithDelayedInResponseExceedsSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( + 150, + 0, + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.UNAVAILABLE + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + SpannerException spannerException = + assertThrows( + SpannerException.class, + () -> { + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + }); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(2, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(0, requests.size()); + + testSpanner.close(); + } + + @Test + public void testRetryWithDelayInExceptionWithInSessionCreationWaitTime() { + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTimeAndExceptions( + 50, + 0, + Arrays.asList( + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException(), + Status.DEADLINE_EXCEEDED + .withDescription( + "CallOptions deadline exceeded after 22.986872393s. " + + "Name resolution delay 6.911918521 seconds. [closed=[], " + + "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]") + .asRuntimeException()))); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setUseMultiplexedSession(true) + .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) + .setWaitForMinSessionsDuration(Duration.ofMillis(200)) + .setFailOnSessionLeak() + .build()) + .build() + .getService(); + + DatabaseClientImpl client = + (DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + List createSessionRequests = + mockSpanner.getRequestsOfType(CreateSessionRequest.class); + assertEquals(3, createSessionRequests.size()); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, requests.size()); + + testSpanner.close(); + } + @Test public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() { mockSpanner.setCreateSessionExecutionTime(