Skip to content

Commit de2a5f7

Browse files
committed
fix tests
1 parent 5de65b9 commit de2a5f7

File tree

2 files changed

+91
-78
lines changed

2 files changed

+91
-78
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 80 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,21 @@ private void onSessionCreatedSuccessfully(SessionImpl session) {
452452

453453
/**
454454
* Called when multiplexed session creation fails. This method stores the error temporarily,
455-
* notifies waiting threads, and starts the maintainer for retry (unless UNIMPLEMENTED).
455+
* notifies waiting threads, and starts the maintainer for retry (unless it's a permanent error).
456+
*
457+
* <p>Permanent errors that should NOT be retried:
458+
*
459+
* <ul>
460+
* <li>UNIMPLEMENTED - multiplexed sessions are not supported
461+
* <li>DatabaseNotFoundException - the database doesn't exist
462+
* <li>InstanceNotFoundException - the instance doesn't exist
463+
* </ul>
464+
*
465+
* <p>Note: We do NOT set {@link #resourceNotFoundException} here because that field is used by
466+
* {@link #isValid()} to determine if the client should be recreated. Setting it during session
467+
* creation would cause {@link SpannerImpl#getDatabaseClient} to create a new client and retry,
468+
* which we don't want for permanent errors. Instead, we check if the error is a
469+
* ResourceNotFoundException when deciding whether to start the maintainer.
456470
*/
457471
private void onSessionCreationFailed(Throwable t) {
458472
creationLock.lock();
@@ -465,15 +479,27 @@ private void onSessionCreationFailed(Throwable t) {
465479
// Notify all waiting threads
466480
creationLatch.countDown();
467481
creationLatch = new CountDownLatch(1);
468-
// Start the maintainer even on failure (except for UNIMPLEMENTED) so it can retry
469-
if (!unimplemented.get() && !maintainer.isStarted()) {
482+
// Start the maintainer even on failure so it can retry, but NOT for permanent errors:
483+
// - UNIMPLEMENTED: multiplexed sessions are not supported
484+
// - ResourceNotFoundException: database or instance doesn't exist
485+
if (!unimplemented.get() && !isResourceNotFoundException(t) && !maintainer.isStarted()) {
470486
maintainer.start();
471487
}
472488
} finally {
473489
creationLock.unlock();
474490
}
475491
}
476492

493+
/**
494+
* Checks if the throwable is a {@link DatabaseNotFoundException} or {@link
495+
* InstanceNotFoundException}.
496+
*/
497+
private boolean isResourceNotFoundException(Throwable t) {
498+
SpannerException spannerException = SpannerExceptionFactory.asSpannerException(t);
499+
return spannerException instanceof DatabaseNotFoundException
500+
|| spannerException instanceof InstanceNotFoundException;
501+
}
502+
477503
/**
478504
* Waits for the initial session creation to complete if configured to do so. This method handles
479505
* the case where the session creation is still in progress or has failed.
@@ -504,6 +530,10 @@ private void maybeWaitForInitialSessionCreation(SessionPoolOptions sessionPoolOp
504530
* semantics: if no session exists and creation is not in progress, it triggers a new creation
505531
* attempt. If creation is in progress, it waits for the result.
506532
*
533+
* <p>This method uses a blocking lock (not tryLock) to ensure that the creationLatch is always
534+
* read while holding the lock, avoiding a race condition where a waiting thread could read an old
535+
* latch that has already been counted down and replaced.
536+
*
507537
* @return the session reference
508538
* @throws SpannerException if session creation fails
509539
*/
@@ -530,91 +560,66 @@ SessionReference getOrCreateSessionReference() {
530560
ErrorCode.UNIMPLEMENTED, "Multiplexed sessions are not supported");
531561
}
532562

533-
// Try to acquire the lock for creation
534-
if (creationLock.tryLock()) {
535-
try {
536-
// Double-check after acquiring lock
537-
sessionFuture = multiplexedSessionReference.get();
538-
if (sessionFuture != null) {
539-
try {
540-
return sessionFuture.get();
541-
} catch (ExecutionException | InterruptedException e) {
542-
throw SpannerExceptionFactory.asSpannerException(
543-
e.getCause() != null ? e.getCause() : e);
544-
}
545-
}
546-
547-
// Check if creation is already in progress
548-
if (creationInProgress.get()) {
549-
// Wait for the ongoing creation to complete
550-
creationLock.unlock();
551-
return waitForSessionCreation();
552-
}
553-
554-
// Start a new creation attempt
555-
creationInProgress.set(true);
556-
CountDownLatch currentLatch = creationLatch;
557-
creationLock.unlock();
558-
559-
// Trigger async session creation
560-
sessionClient.asyncCreateMultiplexedSession(
561-
new SessionConsumer() {
562-
@Override
563-
public void onSessionReady(SessionImpl session) {
564-
onSessionCreatedSuccessfully(session);
565-
}
566-
567-
@Override
568-
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
569-
onSessionCreationFailed(t);
570-
}
571-
});
563+
// Check if resource not found (database or instance) - don't retry in this case
564+
Throwable lastError = lastCreationError.get();
565+
if (lastError != null && isResourceNotFoundException(lastError)) {
566+
throw SpannerExceptionFactory.asSpannerException(lastError);
567+
}
572568

573-
// Wait for creation to complete
569+
// Use blocking lock to avoid race condition with latch replacement.
570+
// The latch must be read while holding the lock to ensure we wait on the correct latch.
571+
CountDownLatch latchToWaitOn;
572+
boolean amTheCreator = false;
573+
creationLock.lock();
574+
try {
575+
// Re-check state after acquiring lock
576+
sessionFuture = multiplexedSessionReference.get();
577+
if (sessionFuture != null) {
574578
try {
575-
currentLatch.await();
576-
} catch (InterruptedException e) {
577-
throw SpannerExceptionFactory.propagateInterrupt(e);
579+
return sessionFuture.get();
580+
} catch (ExecutionException | InterruptedException e) {
581+
throw SpannerExceptionFactory.asSpannerException(e.getCause() != null ? e.getCause() : e);
578582
}
583+
}
579584

580-
// Check result
581-
sessionFuture = multiplexedSessionReference.get();
582-
if (sessionFuture != null) {
583-
try {
584-
return sessionFuture.get();
585-
} catch (ExecutionException | InterruptedException e) {
586-
throw SpannerExceptionFactory.asSpannerException(
587-
e.getCause() != null ? e.getCause() : e);
588-
}
589-
}
585+
// Capture the current latch while holding the lock
586+
latchToWaitOn = creationLatch;
590587

591-
// Creation failed
592-
Throwable error = lastCreationError.get();
593-
if (error != null) {
594-
throw SpannerExceptionFactory.asSpannerException(error);
595-
}
596-
throw SpannerExceptionFactory.newSpannerException(
597-
ErrorCode.INTERNAL, "Failed to create multiplexed session");
598-
} finally {
599-
if (creationLock.isHeldByCurrentThread()) {
600-
creationLock.unlock();
601-
}
588+
if (!creationInProgress.get()) {
589+
// We are the creator
590+
amTheCreator = true;
591+
creationInProgress.set(true);
602592
}
603-
} else {
604-
// Another thread is creating, wait for it
605-
return waitForSessionCreation();
593+
// If creationInProgress is true, we are a waiter
594+
} finally {
595+
creationLock.unlock();
606596
}
607-
}
608597

609-
/** Waits for an ongoing session creation to complete and returns the result. */
610-
private SessionReference waitForSessionCreation() {
598+
if (amTheCreator) {
599+
// Trigger async session creation
600+
sessionClient.asyncCreateMultiplexedSession(
601+
new SessionConsumer() {
602+
@Override
603+
public void onSessionReady(SessionImpl session) {
604+
onSessionCreatedSuccessfully(session);
605+
}
606+
607+
@Override
608+
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
609+
onSessionCreationFailed(t);
610+
}
611+
});
612+
}
613+
614+
// Wait for creation to complete (both creator and waiters wait on the same latch)
611615
try {
612-
creationLatch.await();
616+
latchToWaitOn.await();
613617
} catch (InterruptedException e) {
614618
throw SpannerExceptionFactory.propagateInterrupt(e);
615619
}
616620

617-
ApiFuture<SessionReference> sessionFuture = multiplexedSessionReference.get();
621+
// Check result
622+
sessionFuture = multiplexedSessionReference.get();
618623
if (sessionFuture != null) {
619624
try {
620625
return sessionFuture.get();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public void testMetricsWithGaxRetryUnaryRpc() {
264264
}
265265

266266
@Test
267-
public void testNoNetworkConnection() {
267+
public void testNoNetworkConnection() throws InterruptedException {
268268
assumeFalse(TestHelper.isMultiplexSessionDisabled());
269269
// Create a Spanner instance that tries to connect to a server that does not exist.
270270
// This simulates a bad network connection.
@@ -308,6 +308,11 @@ public void testNoNetworkConnection() {
308308
String instance = "i";
309309
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d"));
310310

311+
// Wait for the initial async session creation to complete (fail).
312+
// This ensures deterministic behavior - the first creation will have failed by the time
313+
// we execute the query, so the query will trigger a retry attempt.
314+
Thread.sleep(100);
315+
311316
// Using this client will return UNAVAILABLE, as the server is not reachable and we have
312317
// disabled retries.
313318
SpannerException exception =
@@ -337,9 +342,12 @@ public void testNoNetworkConnection() {
337342
getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME);
338343
assertNotNull(attemptCountMetricData);
339344

340-
// Attempt count should have a failed metric point for CreateSession.
345+
// Attempt count should have failed metric points for CreateSession.
346+
// With retry-on-access behavior, we expect 2 attempts:
347+
// 1. Initial async CreateSession during client construction
348+
// 2. Retry attempt when executeQuery().next() is called
341349
assertEquals(
342-
1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
350+
2, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
343351
assertTrue(
344352
checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME));
345353
assertTrue(

0 commit comments

Comments
 (0)