Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -262,6 +263,9 @@ public void close() {
*/
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();

private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);

private final BitSet channelUsage;

private final int numChannels;
Expand Down Expand Up @@ -358,11 +362,19 @@ public void close() {
SettableApiFuture.create();
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
maybeWaitForSessionCreation(
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
initialSessionReferenceFuture);
}

private void asyncCreateMultiplexedSession(
SettableApiFuture<SessionReference> sessionReferenceFuture) {
this.sessionClient.asyncCreateMultiplexedSession(
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
initialSessionReferenceFuture.set(session.getSessionReference());
sessionReferenceFuture.set(session.getSessionReference());
// only start the maintainer if we actually managed to create a session in the first
// place.
maintainer.start();
Expand Down Expand Up @@ -395,33 +407,62 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
// UNIMPLEMENTED is returned.
maybeMarkUnimplemented(t);
initialSessionReferenceFuture.setException(t);
sessionReferenceFuture.setException(t);
}
});
maybeWaitForSessionCreation(
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
initialSessionReferenceFuture);
}

void setPool(SessionPool pool) {
this.pool = pool;
}

private static void maybeWaitForSessionCreation(
SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) {
private void maybeWaitForSessionCreation(
SessionPoolOptions sessionPoolOptions,
SettableApiFuture<SessionReference> initialSessionReferenceFuture) {
Duration waitDuration = sessionPoolOptions.getWaitForMinSessions();
if (waitDuration != null && !waitDuration.isZero()) {
long timeoutMillis = waitDuration.toMillis();
try {
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
} catch (TimeoutException timeoutException) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation");

SpannerException lastException = null;
SettableApiFuture<SessionReference> sessionReferenceFuture = initialSessionReferenceFuture;
Duration remainingTime;

Instant endTime = Instant.now().plus(waitDuration);
while ((remainingTime = Duration.between(Instant.now(), endTime)).toMillis() > 0) {
// If any exception is thrown, then retry the multiplexed session creation
if (sessionReferenceFuture == null) {
sessionReferenceFuture = SettableApiFuture.create();
asyncCreateMultiplexedSession(sessionReferenceFuture);
this.multiplexedSessionReference.set(sessionReferenceFuture);
}
try {
sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
lastException = null;
break;
} catch (ExecutionException executionException) {
lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException);
} catch (TimeoutException timeoutException) {
lastException =
SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timed out after waiting "
+ waitDuration.toMillis()
+ "ms for multiplexed session creation");
}
// if any exception is thrown, then set the session reference to null to retry the
// multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or
// RESOURCE_EXHAUSTED
if (RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) {
sessionReferenceFuture = null;
} else {
break;
}
}
// if the wait time elapsed and multiplexed session fetch failed then throw the last exception
// that we have received
if (lastException != null) {
throw lastException;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,16 @@ void simulateExecutionTime(
boolean stickyGlobalExceptions,
CountDownLatch freezeLock) {
Uninterruptibles.awaitUninterruptibly(freezeLock);
checkException(globalExceptions, stickyGlobalExceptions);
if (streamIndices.isEmpty()) {
checkException(this.exceptions, stickyException);
}
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
Uninterruptibles.sleepUninterruptibly(
(randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime))
+ minimumExecutionTime,
TimeUnit.MILLISECONDS);
}
checkException(globalExceptions, stickyGlobalExceptions);
if (streamIndices.isEmpty()) {
checkException(this.exceptions, stickyException);
}
}

private static void checkException(Queue<Exception> exceptions, boolean keepException) {
Expand Down
Loading
Loading