diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java index 62ca8b35645e..1abf1a589200 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosNotFoundTests.java @@ -26,7 +26,6 @@ import reactor.core.publisher.Flux; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; @@ -37,7 +36,6 @@ public class CosmosNotFoundTests extends FaultInjectionTestBase { private static final Logger logger = LoggerFactory.getLogger(CosmosNotFoundTests.class); - private static final String thinClientEndpointIndicator = ":10250/"; private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor accessor = ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); @@ -602,30 +600,4 @@ public void performBulkOnDeletedContainerWithGatewayV2() throws InterruptedExcep // System.clearProperty("COSMOS.THINCLIENT_ENABLED"); } } - - private static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { - AssertionsForClassTypes.assertThat(diagnostics).isNotNull(); - - CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); - AssertionsForClassTypes.assertThat(ctx).isNotNull(); - - Collection requests = ctx.getRequestInfo(); - AssertionsForClassTypes.assertThat(requests).isNotNull(); - AssertionsForClassTypes.assertThat(requests.size()).isPositive(); - - for (CosmosDiagnosticsRequestInfo requestInfo : requests) { - logger.info( - "Endpoint: {}, RequestType: {}, Partition: {}/{}, ActivityId: {}", - requestInfo.getEndpoint(), - requestInfo.getRequestType(), - requestInfo.getPartitionId(), - requestInfo.getPartitionKeyRangeId(), - requestInfo.getActivityId()); - if (requestInfo.getEndpoint().contains(thinClientEndpointIndicator)) { - return; - } - } - - fail("No request targeting thin client proxy endpoint."); - } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_queryAfterCreation.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_queryAfterCreation.java index b92fa8ee2c9a..b3a9e5941e30 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_queryAfterCreation.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_queryAfterCreation.java @@ -15,7 +15,7 @@ public class FITests_queryAfterCreation extends FaultInjectionWithAvailabilityStrategyTestsBase { - @Test(groups = {"fi-multi-master"}, dataProvider = "testConfigs_queryAfterCreation", retryAnalyzer = SuperFlakyTestRetryAnalyzer.class) + @Test(groups = {"fi-multi-master", "fi-thinclient-multi-master"}, dataProvider = "testConfigs_queryAfterCreation", retryAnalyzer = SuperFlakyTestRetryAnalyzer.class) public void queryAfterCreation( String testCaseId, Duration endToEndTimeout, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAfterCreation.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAfterCreation.java index 4dc58672ce15..efc7f68dc864 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAfterCreation.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAfterCreation.java @@ -16,7 +16,7 @@ public class FITests_readAfterCreation extends FaultInjectionWithAvailabilityStrategyTestsBase { - @Test(groups = {"fi-multi-master"}, dataProvider = "testConfigs_readAfterCreation", retryAnalyzer = SuperFlakyTestRetryAnalyzer.class) + @Test(groups = {"fi-multi-master", "fi-thinclient-multi-master"}, dataProvider = "testConfigs_readAfterCreation", retryAnalyzer = SuperFlakyTestRetryAnalyzer.class) public void readAfterCreation( String testCaseId, Duration endToEndTimeout, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAllAfterCreation.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAllAfterCreation.java index 0d8605093aa1..aa37a47ae29e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAllAfterCreation.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readAllAfterCreation.java @@ -12,7 +12,7 @@ import java.util.function.Function; public class FITests_readAllAfterCreation extends FaultInjectionWithAvailabilityStrategyTestsBase { - @Test(groups = {"fi-multi-master"}, dataProvider = "testConfigs_readAllAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) + @Test(groups = {"fi-multi-master", "fi-thinclient-multi-master"}, dataProvider = "testConfigs_readAllAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) public void readAllAfterCreation( String testCaseId, Duration endToEndTimeout, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readManyAfterCreation.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readManyAfterCreation.java index cfc3146447fc..dbe56180efcb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readManyAfterCreation.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_readManyAfterCreation.java @@ -17,7 +17,7 @@ public class FITests_readManyAfterCreation extends FaultInjectionWithAvailabilityStrategyTestsBase { - @Test(groups = {"fi-multi-master"}, dataProvider = "testConfigs_readManyAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) + @Test(groups = {"fi-multi-master", "fi-thinclient-multi-master"}, dataProvider = "testConfigs_readManyAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) public void readManyAfterCreation( String testCaseId, Duration endToEndTimeout, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_writeAfterCreate.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_writeAfterCreate.java index 24ee7870024f..853871aa3aef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_writeAfterCreate.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FITests_writeAfterCreate.java @@ -14,7 +14,7 @@ public class FITests_writeAfterCreate extends FaultInjectionWithAvailabilityStrategyTestsBase { - @Test(groups = {"fi-multi-master"}, dataProvider = "testConfigs_writeAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) + @Test(groups = {"fi-multi-master", "fi-thinclient-multi-master"}, dataProvider = "testConfigs_writeAfterCreation", retryAnalyzer = FlakyTestRetryAnalyzer.class) public void writeAfterCreation( String testCaseId, Duration endToEndTimeout, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java index c6b858ae9430..dbb158438a9e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTestsBase.java @@ -18,6 +18,7 @@ import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import org.testng.SkipException; import com.azure.cosmos.models.CosmosClientTelemetryConfig; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosItemIdentity; @@ -204,7 +205,7 @@ public String resolveTestNameSuffix(Object[] row) { return (String)row[0]; } - @BeforeClass(groups = { "fi-multi-master" }) + @BeforeClass(groups = { "fi-multi-master", "fi-thinclient-multi-master" }) public void beforeClass() { CosmosClientBuilder clientBuilder = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) @@ -337,7 +338,7 @@ public void beforeClass() { safeClose(dummyClient); } } - @AfterClass(groups = { "fi-multi-master" }) + @AfterClass(groups = { "fi-multi-master", "fi-thinclient-multi-master" }) public void afterClass() { CosmosClientBuilder clientBuilder = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) @@ -4882,14 +4883,29 @@ protected void execute( ConnectionMode connectionMode, boolean shouldInjectPreferredRegionsInClient) { + // When thin client + HTTP/2 are enabled, all requests route through the thin client + // gateway proxy — DIRECT mode is not exercised. Skip DIRECT mode tests. + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled() && connectionMode == ConnectionMode.DIRECT) { + throw new SkipException( + "Skipping DIRECT mode test '" + testCaseId + "' — thin client forces GATEWAY mode"); + } + // Test two cases here: // - the endToEndOperationLatencyPolicyConfig is being configured on the client only // - the endToEndOperationLatencyPolicyConfig is being configured on the request options only for (boolean e2eTimeoutPolicyOnClient : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) { logger.info("START {}, e2eTimeoutPolicyOnClient {}", testCaseId, e2eTimeoutPolicyOnClient); + // Thin client adds ~500ms overhead for container + partition key range cache lookups + // through the RNTBD-encoded thin client proxy path. Increase e2e timeout to avoid + // spurious 408 (OperationCancelled) failures with tight timeouts. + Duration effectiveEndToEndTimeout = endToEndTimeout; + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled() && endToEndTimeout != null) { + effectiveEndToEndTimeout = endToEndTimeout.plusMillis(500); + } + CosmosEndToEndOperationLatencyPolicyConfigBuilder e2ePolicyBuilder = - new CosmosEndToEndOperationLatencyPolicyConfigBuilder(endToEndTimeout) + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(effectiveEndToEndTimeout) .enable(true); CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig = availabilityStrategy != null @@ -5017,6 +5033,14 @@ protected void execute( ctxValidation.accept(currentCtx); } } + + // When thin client + HTTP/2 are enabled (fi-thinclient-multi-master / fi-thinclient-multi-region) + // and connection mode is GATEWAY, validate that requests targeted the thin client proxy endpoint + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled() && connectionMode == ConnectionMode.GATEWAY) { + for (CosmosDiagnosticsContext diagnosticsContext : diagnosticsContexts) { + assertThinClientEndpointUsed(diagnosticsContext); + } + } } catch (Exception e) { if (e instanceof CosmosException) { CosmosException cosmosException = Utils.as(e, CosmosException.class); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java index 92d21daf488b..19bd22d6f570 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java @@ -464,7 +464,7 @@ public Object[][] ppafDynamicEnablement503Only() { }; } - @BeforeClass(groups = {"multi-region"}) + @BeforeClass(groups = {"multi-region", "fi-thinclient-multi-region"}) public void beforeClass() { this.sharedClient = getClientBuilder().buildAsyncClient(); @@ -484,7 +484,7 @@ public void beforeClass() { this.accountLevelLocationReadableLocationContext = getAccountLevelLocationContext(databaseAccountSnapshot, false); } - @AfterClass(groups = {"multi-region"}) + @AfterClass(groups = {"multi-region", "fi-thinclient-multi-region"}) public void afterClass() throws InterruptedException { safeClose(this.sharedClient); System.gc(); @@ -1258,7 +1258,7 @@ public Object[][] ppafTestConfigsWithWriteOps() { *
  • Success vs failure based on phase and configuration.
  • * */ - @Test(groups = {"multi-region"}, dataProvider = "ppafTestConfigsWithWriteOps") + @Test(groups = {"multi-region", "fi-thinclient-multi-region"}, dataProvider = "ppafTestConfigsWithWriteOps") public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( String testType, OperationType operationType, @@ -1279,6 +1279,11 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( throw new SkipException(String.format("Test with type : %s not eligible for specified connection mode %s.", testType, connectionMode)); } + // Thin client only supports GATEWAY mode - skip DIRECT mode tests + if (connectionMode == ConnectionMode.DIRECT && Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + throw new SkipException("DIRECT connection mode is not supported with thin client - skipping."); + } + if (connectionMode == ConnectionMode.DIRECT) { TransportClient transportClientMock = Mockito.mock(TransportClient.class); @@ -1439,10 +1444,20 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); String regionWithIssues = preferredRegions.get(0); - URI locationEndpointWithIssues = new URI(readableRegionNameToEndpoint.get(regionWithIssues) + "dbs/" + this.sharedDatabase.getId() + "/colls/" + this.sharedSinglePartitionContainer.getId() + "/docs"); + String baseEndpoint = readableRegionNameToEndpoint.get(regionWithIssues); + + URI locationEndpointWithIssues = new URI(baseEndpoint + "dbs/" + this.sharedDatabase.getId() + "/colls/" + this.sharedSinglePartitionContainer.getId() + "/docs"); ReflectionUtils.setGatewayHttpClient(rxStoreModel, mockedHttpClient); + // When thin client is enabled, data requests route through thinProxy (ThinClientStoreModel) + // which uses RNTBD binary encoding — incompatible with standard HTTP mock responses. + // Replace thinProxy with gatewayProxy so data requests use the same mocked HttpClient + // with standard HTTP encoding. PPAF retry/failover logic is transport-agnostic. + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + ReflectionUtils.setThinProxy(rxDocumentClient, rxStoreModel); + } + setupHttpClientToReturnSuccessResponse(mockedHttpClient, operationType, databaseAccount, successStatusCode); CosmosException cosmosException = createCosmosException( @@ -1521,7 +1536,7 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodes( *

    Expectations are provided by the data provider: when disabled, the request should not succeed; * when enabled, it should succeed. Works for both DIRECT and GATEWAY connection modes.

    */ - @Test(groups = {"multi-region"}, dataProvider = "ppafDynamicEnablement503Only") + @Test(groups = {"multi-region", "fi-thinclient-multi-region"}, dataProvider = "ppafDynamicEnablement503Only") public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamicEnablement( String testType, OperationType operationType, @@ -1542,6 +1557,11 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamic throw new SkipException(String.format("Test with type : %s not eligible for specified connection mode %s.", testType, connectionMode)); } + // Thin client only supports GATEWAY mode - skip DIRECT mode tests + if (connectionMode == ConnectionMode.DIRECT && Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + throw new SkipException("DIRECT connection mode is not supported with thin client - skipping."); + } + // DIRECT flow: swap transport client, inject error for primary region/PK range, and verify phase-by-phase if (connectionMode == ConnectionMode.DIRECT) { TransportClient transportClientMock = Mockito.mock(TransportClient.class); @@ -1690,11 +1710,21 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamic assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(1); String regionWithIssues = preferredRegions.get(0); - URI locationEndpointWithIssues = new URI(readableRegionNameToEndpoint.get(regionWithIssues) + "dbs/" + this.sharedDatabase.getId() + "/colls/" + this.sharedSinglePartitionContainer.getId() + "/docs"); + String baseEndpoint = readableRegionNameToEndpoint.get(regionWithIssues); + + URI locationEndpointWithIssues = new URI(baseEndpoint + "dbs/" + this.sharedDatabase.getId() + "/colls/" + this.sharedSinglePartitionContainer.getId() + "/docs"); // Redirect gateway calls through our mocked HttpClient ReflectionUtils.setGatewayHttpClient(rxStoreModel, mockedHttpClient); + // When thin client is enabled, data requests route through thinProxy (ThinClientStoreModel) + // which uses RNTBD binary encoding — incompatible with standard HTTP mock responses. + // Replace thinProxy with gatewayProxy so data requests use the same mocked HttpClient + // with standard HTTP encoding. PPAF retry/failover logic is transport-agnostic. + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + ReflectionUtils.setThinProxy(rxDocumentClient, rxStoreModel); + } + setupHttpClientToReturnSuccessResponse(mockedHttpClient, operationType, databaseAccountForResponses, successStatusCode); CosmosException cosmosException = createCosmosException( @@ -1788,7 +1818,7 @@ public void testPpafWithWriteFailoverWithEligibleErrorStatusCodesWithPpafDynamic *

    Dynamic enablement is achieved by overriding GlobalEndpointManager's owner to * inject the PPAF flag into DatabaseAccount snapshots.

    */ - @Test(groups = {"multi-region"}, dataProvider = "ppafNonWriteDynamicEnablementScenarios") + @Test(groups = {"multi-region", "fi-thinclient-multi-region"}, dataProvider = "ppafNonWriteDynamicEnablementScenarios") public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( String testType, OperationType operationType, @@ -1804,6 +1834,11 @@ public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( throw new SkipException(String.format("Test with type : %s not eligible for specified connection mode %s.", testType, connectionMode)); } + // Thin client only supports GATEWAY mode - skip DIRECT mode tests + if (connectionMode == ConnectionMode.DIRECT && Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + throw new SkipException("DIRECT connection mode is not supported with thin client - skipping."); + } + final int consecutiveFaults = 10; // ===================== DIRECT MODE PATH ===================== @@ -2060,6 +2095,15 @@ public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( expectedDuringWindow, expectedAfterWindow); + // Validate thin client endpoint was used when thin client is enabled + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + ResponseWrapper probeResponse = dataPlaneOperation.apply(params); + CosmosDiagnostics diag = extractDiagnostics(probeResponse); + if (diag != null) { + assertThinClientEndpointUsed(diag.getDiagnosticsContext()); + } + } + } catch (Exception e) { Assertions.fail("The test ran into an exception {}", e); } finally { @@ -2089,6 +2133,19 @@ private void runHedgingPhasesForNonWrite( this.validateExpectedResponseCharacteristics.accept(postWindow, expectedAfterWindow); } + private static CosmosDiagnostics extractDiagnostics(ResponseWrapper response) { + if (response.cosmosItemResponse != null) { + return response.cosmosItemResponse.getDiagnostics(); + } else if (response.feedResponse != null) { + return response.feedResponse.getCosmosDiagnostics(); + } else if (response.cosmosException != null) { + return response.cosmosException.getDiagnostics(); + } else if (response.batchResponse != null) { + return response.batchResponse.getDiagnostics(); + } + return null; + } + private static class DelegatingDatabaseAccountManagerInternal implements DatabaseAccountManagerInternal { private final DatabaseAccountManagerInternal delegate; private final AtomicReference ppafEnabledRef; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java index 57b6810a780d..fd856429841e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionCircuitBreakerE2ETests.java @@ -4,6 +4,7 @@ package com.azure.cosmos; import com.azure.cosmos.faultinjection.FaultInjectionTestBase; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; @@ -237,7 +238,7 @@ public PerPartitionCircuitBreakerE2ETests(CosmosClientBuilder cosmosClientBuilde super(cosmosClientBuilder); } - @BeforeClass(groups = {"circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region"}) + @BeforeClass(groups = {"circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region", "fi-thinclient-multi-master"}) public void beforeClass() { try (CosmosAsyncClient testClient = getClientBuilder().buildAsyncClient()) { RxDocumentClientImpl documentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(testClient); @@ -2800,7 +2801,7 @@ public void miscellaneousDocumentOperationHitsTerminalExceptionAcrossKRegionsDir // Added FlakyTestRetryAnalyzer to handle transient failures in circuit breaker tests with fault injection // Increased timeout from 4*TIMEOUT to 5*TIMEOUT (200 seconds) to allow for timing variations in CI - @Test(groups = {"circuit-breaker-misc-gateway"}, dataProvider = "miscellaneousOpTestConfigsGateway", timeOut = 5 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) + @Test(groups = {"circuit-breaker-misc-gateway", "fi-thinclient-multi-master"}, dataProvider = "miscellaneousOpTestConfigsGateway", timeOut = 5 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void miscellaneousDocumentOperationHitsTerminalExceptionAcrossKRegionsGateway( String testId, FaultInjectionRuleParamsWrapper faultInjectionRuleParamsWrapper, @@ -2906,6 +2907,11 @@ private void executeMiscOperationHitsTerminalExceptionAcrossKRegions( throw new SkipException(String.format("Test is not applicable to %s connectivity mode!", connectionPolicy.getConnectionMode())); } + // Thin client only supports GATEWAY mode - skip DIRECT mode tests + if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT && Configs.isThinClientEnabled() && Configs.isHttp2Enabled()) { + throw new SkipException("DIRECT connection mode is not supported with thin client - skipping."); + } + CosmosAsyncClient asyncClient = null; FaultInjectionOperationType faultInjectionOperationType = faultInjectionRuleParamsWrapper.getFaultInjectionOperationType(); faultInjectionRuleParamsWrapper.withFaultInjectionConnectionType(evaluateFaultInjectionConnectionType(connectionPolicy.getConnectionMode())); @@ -3650,6 +3656,13 @@ private void execute( validateRegionsContactedWhenShortCircuitingHasKickedIn.accept(response.batchResponse.getDiagnostics().getDiagnosticsContext()); } } + + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled() && response.cosmosException == null) { + CosmosDiagnosticsContext ctx = getDiagnosticsContext(response); + if (ctx != null) { + assertThinClientEndpointUsed(ctx); + } + } } // Ensure circuit breaker has kicked in before fail back @@ -3692,6 +3705,13 @@ private void execute( validateRegionsContactedWhenShortCircuitRegionMarkedAsHealthyOrHealthyTentative.accept(response.batchResponse.getDiagnostics().getDiagnosticsContext()); } + + if (Configs.isThinClientEnabled() && Configs.isHttp2Enabled() && response.cosmosException == null) { + CosmosDiagnosticsContext ctx = getDiagnosticsContext(response); + if (ctx != null) { + assertThinClientEndpointUsed(ctx); + } + } } } } catch (InterruptedException ex) { @@ -3705,6 +3725,19 @@ private void execute( } } + private static CosmosDiagnosticsContext getDiagnosticsContext(ResponseWrapper response) { + if (response.cosmosItemResponse != null) { + return response.cosmosItemResponse.getDiagnostics().getDiagnosticsContext(); + } else if (response.feedResponse != null) { + return response.feedResponse.getCosmosDiagnostics().getDiagnosticsContext(); + } else if (response.cosmosException != null) { + return response.cosmosException.getDiagnostics().getDiagnosticsContext(); + } else if (response.batchResponse != null) { + return response.batchResponse.getDiagnostics().getDiagnosticsContext(); + } + return null; + } + private static int resolveTestObjectCountToBootstrapFrom(FaultInjectionOperationType faultInjectionOperationType, int opCount) { switch (faultInjectionOperationType) { case READ_ITEM: @@ -4529,18 +4562,18 @@ private String resolveContainerIdByFaultInjectionOperationType(FaultInjectionOpe } } - @BeforeMethod(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true) + @BeforeMethod(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region", "fi-thinclient-multi-master" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true) public void beforeMethod() throws Exception { // add a cool off time CosmosNettyLeakDetectorFactory.resetIdentifiedLeaks(); } - @AfterMethod(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) + @AfterMethod(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region", "fi-thinclient-multi-master" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) public void afterMethod() throws Exception { logger.info("captureNettyLeaks: {}", captureNettyLeaks()); } - @AfterClass(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region" }) + @AfterClass(groups = { "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", "circuit-breaker-read-all-read-many", "multi-region", "fi-thinclient-multi-master" }) public void afterClass() { CosmosClientBuilder clientBuilder = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java index 36854e42bec6..b018161dcd0f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnGatewayV2Tests.java @@ -9,10 +9,12 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.CosmosDiagnosticsContext; -import com.azure.cosmos.CosmosDiagnosticsRequestInfo; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosException; import com.azure.cosmos.FlakyTestRetryAnalyzer; import com.azure.cosmos.TestObject; +import com.azure.cosmos.ThresholdBasedAvailabilityStrategy; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; @@ -25,6 +27,7 @@ import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; @@ -41,7 +44,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.assertj.core.api.AssertionsForClassTypes; -import org.assertj.core.api.Fail; +import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -51,7 +54,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -69,8 +71,6 @@ public class FaultInjectionServerErrorRuleOnGatewayV2Tests extends FaultInjectio private Map readRegionMap; private DatabaseAccount databaseAccount; - private static final String thinClientEndpointIndicator = ":10250/"; - private static final String FAULT_INJECTION_RULE_NON_APPLICABLE_ADDRESS = "Addresses mismatch"; private static final String FAULT_INJECTION_RULE_NON_APPLICABLE_HIT_LIMIT = "Hit Limit reached"; private static final String FAULT_INJECTION_RULE_NON_APPLICABLE_REGION_ENDPOINT = "RegionalRoutingContext mismatch"; @@ -555,6 +555,79 @@ public void faultInjectionServerErrorRuleTests_ServerResponseDelay( } } + @Test(groups = {"fi-thinclient-multi-master"}, timeOut = 4 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void faultInjectionServerErrorRuleTests_PkScopedQueryWithHedging() { + // Regression test for: RxDocumentServiceRequest.clone() drops partitionKeyDefinition + // When hedging fires for a PK-scoped query on GW V2, the cloned request NPEs in + // ThinClientStoreModel.wrapInHttpRequest because partitionKeyDefinition is null. + + if (this.preferredReadRegions.size() < 2) { + throw new SkipException("Need >= 2 regions for hedging"); + } + + String firstRegion = this.preferredReadRegions.get(0); + + // Inject response delay ONLY in primary region to force hedging to secondary + String ruleId = "pkScopedQueryHedge-" + UUID.randomUUID(); + FaultInjectionRule responseDelayRule = + new FaultInjectionRuleBuilder(ruleId) + .condition( + new FaultInjectionConditionBuilder() + .connectionType(FaultInjectionConnectionType.GATEWAY) + .operationType(FaultInjectionOperationType.QUERY_ITEM) + .endpoints( + new FaultInjectionEndpointBuilder(FeedRange.forFullRange()) + .replicaCount(4) + .build()) + .region(firstRegion) + .build() + ) + .result( + FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) + .times(4) + .delay(Duration.ofSeconds(6)) + .build() + ) + .duration(Duration.ofMinutes(1)) + .build(); + + try { + TestObject createdItem = TestObject.create(); + this.cosmosAsyncContainer.createItem(createdItem).block(); + + CosmosFaultInjectionHelper + .configureFaultInjectionRules(this.cosmosAsyncContainer, Arrays.asList(responseDelayRule)) + .block(); + + // PK-scoped query with availability strategy — this triggers hedging + clone() + CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions(); + queryOptions.setPartitionKey(new PartitionKey(createdItem.getId())); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy( + Duration.ofMillis(500), Duration.ofMillis(500))) + .build(); + queryOptions.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + String query = String.format("SELECT * from c where c.id = '%s'", createdItem.getId()); + + // Without the clone() fix this would NPE: + // "partitionKeyDefinition" is null in ThinClientStoreModel.wrapInHttpRequest + FeedResponse response = this.cosmosAsyncContainer + .queryItems(query, queryOptions, TestObject.class) + .byPage() + .blockFirst(); + + AssertionsForClassTypes.assertThat(response).isNotNull(); + AssertionsForClassTypes.assertThat(response.getResults().size()).isGreaterThanOrEqualTo(1); + assertThinClientEndpointUsed(response.getCosmosDiagnostics()); + } finally { + responseDelayRule.disable(); + } + } + @Test(groups = {"fi-thinclient-multi-master"}, timeOut = 4 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws JsonProcessingException { // simulate high channel acquisition/connectionTimeout @@ -716,32 +789,6 @@ private static void validate(AccountLevelLocationContext accountLevelLocationCon } } - private static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { - AssertionsForClassTypes.assertThat(diagnostics).isNotNull(); - - CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); - AssertionsForClassTypes.assertThat(ctx).isNotNull(); - - Collection requests = ctx.getRequestInfo(); - AssertionsForClassTypes.assertThat(requests).isNotNull(); - AssertionsForClassTypes.assertThat(requests.size()).isPositive(); - - for (CosmosDiagnosticsRequestInfo requestInfo : requests) { - logger.info( - "Endpoint: {}, RequestType: {}, Partition: {}/{}, ActivityId: {}", - requestInfo.getEndpoint(), - requestInfo.getRequestType(), - requestInfo.getPartitionId(), - requestInfo.getPartitionKeyRangeId(), - requestInfo.getActivityId()); - if (requestInfo.getEndpoint().contains(thinClientEndpointIndicator)) { - return; - } - } - - Fail.fail("No request targeting thin client proxy endpoint."); - } - private static class AccountLevelLocationContext { private final List serviceOrderedReadableRegions; private final List serviceOrderedWriteableRegions; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java index 64cd7fe37115..93589df24135 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ThinClientStoreModelTest.java @@ -2,14 +2,18 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; +import com.azure.cosmos.models.PartitionKeyDefinition; import io.netty.channel.ConnectTimeoutException; import org.mockito.Mockito; import org.testng.annotations.Test; import reactor.core.publisher.Mono; import java.net.URI; +import java.util.Collections; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; public class ThinClientStoreModelTest { @@ -58,4 +62,30 @@ public void testThinClientStoreModel() throws Exception { //no-op } } + + @Test(groups = "unit") + public void cloneShouldPreservePartitionKeyDefinition() { + DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class); + Mockito.doReturn(new DiagnosticsClientContext.DiagnosticsClientConfig()).when(clientContext).getConfig(); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.createFromName( + clientContext, + OperationType.Query, + "/dbs/db1/colls/c1", + ResourceType.Document); + + PartitionKeyDefinition pkDef = new PartitionKeyDefinition(); + pkDef.setPaths(Collections.singletonList("/partitionKey")); + + request.setPartitionKeyInternal(PartitionKeyInternal.fromObjectArray(Collections.singletonList("testPk"), true)); + request.setPartitionKeyDefinition(pkDef); + + RxDocumentServiceRequest cloned = request.clone(); + + assertThat(cloned.getPartitionKeyInternal()).isNotNull(); + assertThat(cloned.getPartitionKeyDefinition()) + .as("clone() must preserve partitionKeyDefinition for GW V2 EPK computation") + .isNotNull(); + assertThat(cloned.getPartitionKeyDefinition().getPaths()).containsExactly("/partitionKey"); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RegionalRoutingContextTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RegionalRoutingContextTest.java new file mode 100644 index 000000000000..58a7956a2da3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/RegionalRoutingContextTest.java @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.routing; + +import org.testng.annotations.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class RegionalRoutingContextTest { + + @Test(groups = "unit") + public void equalsShouldIgnoreThinclientEndpoint() throws URISyntaxException { + URI gw = new URI("https://region1.documents.azure.com:443/"); + URI tc = new URI("https://region1.documents.azure.com:10250/"); + + RegionalRoutingContext keyOnly = new RegionalRoutingContext(gw); + RegionalRoutingContext full = new RegionalRoutingContext(gw); + full.setThinclientRegionalEndpoint(tc); + + assertThat(keyOnly).isEqualTo(full); + assertThat(keyOnly.hashCode()).isEqualTo(full.hashCode()); + } + + @Test(groups = "unit") + public void equalsShouldDifferForDifferentGateways() throws URISyntaxException { + URI gw1 = new URI("https://region1.documents.azure.com:443/"); + URI gw2 = new URI("https://region2.documents.azure.com:443/"); + + RegionalRoutingContext ctx1 = new RegionalRoutingContext(gw1); + RegionalRoutingContext ctx2 = new RegionalRoutingContext(gw2); + + assertThat(ctx1).isNotEqualTo(ctx2); + assertThat(ctx1.hashCode()).isNotEqualTo(ctx2.hashCode()); + } + + @Test(groups = "unit") + public void hashMapLookupWithGatewayOnlyKey() throws URISyntaxException { + URI gw = new URI("https://region1.documents.azure.com:443/"); + URI tc = new URI("https://region1.documents.azure.com:10250/"); + + RegionalRoutingContext keyOnly = new RegionalRoutingContext(gw); + RegionalRoutingContext full = new RegionalRoutingContext(gw); + full.setThinclientRegionalEndpoint(tc); + + Map map = new HashMap<>(); + map.put(full, "region1"); + + assertThat(map.get(keyOnly)).isEqualTo("region1"); + } + + @Test(groups = "unit") + public void toStringShouldContainGatewayEndpoint() throws URISyntaxException { + URI gw = new URI("https://region1.documents.azure.com:443/"); + RegionalRoutingContext ctx = new RegionalRoutingContext(gw); + + assertThat(ctx.toString()).contains("https://region1.documents.azure.com:443/"); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETestsWithGatewayV2.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETestsWithGatewayV2.java index 66d9621cbc1f..4bcc931f7dfa 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETestsWithGatewayV2.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETestsWithGatewayV2.java @@ -9,8 +9,6 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDiagnostics; -import com.azure.cosmos.CosmosDiagnosticsContext; -import com.azure.cosmos.CosmosDiagnosticsRequestInfo; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.DatabaseAccount; @@ -37,7 +35,6 @@ import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.azure.cosmos.implementation.throughputControl.TestItem; import org.assertj.core.api.AssertionsForClassTypes; -import org.assertj.core.api.Fail; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -49,7 +46,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Locale; @@ -71,8 +67,6 @@ public class ClientRetryPolicyE2ETestsWithGatewayV2 extends TestSuiteBase { private AccountLevelLocationContext accountLevelReadableLocationContext; private AccountLevelLocationContext accountLevelWritableLocationContext; - private static final String thinClientEndpointIndicator = ":10250/"; - @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") public ClientRetryPolicyE2ETestsWithGatewayV2(CosmosClientBuilder clientBuilder) { super(clientBuilder); @@ -477,32 +471,6 @@ private static void validate(AccountLevelLocationContext accountLevelLocationCon } } - private static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { - AssertionsForClassTypes.assertThat(diagnostics).isNotNull(); - - CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); - AssertionsForClassTypes.assertThat(ctx).isNotNull(); - - Collection requests = ctx.getRequestInfo(); - AssertionsForClassTypes.assertThat(requests).isNotNull(); - AssertionsForClassTypes.assertThat(requests.size()).isPositive(); - - for (CosmosDiagnosticsRequestInfo requestInfo : requests) { - logger.info( - "Endpoint: {}, RequestType: {}, Partition: {}/{}, ActivityId: {}", - requestInfo.getEndpoint(), - requestInfo.getRequestType(), - requestInfo.getPartitionId(), - requestInfo.getPartitionKeyRangeId(), - requestInfo.getActivityId()); - if (requestInfo.getEndpoint().contains(thinClientEndpointIndicator)) { - return; - } - } - - Fail.fail("No request targeting thin client proxy endpoint."); - } - private static class AccountLevelLocationContext { private final List serviceOrderedReadableRegions; private final List serviceOrderedWriteableRegions; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index bcdd41b615fb..2b213a5c5683 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -16,6 +16,9 @@ import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.CosmosDatabase; import com.azure.cosmos.CosmosDatabaseForTest; +import com.azure.cosmos.CosmosDiagnostics; +import com.azure.cosmos.CosmosDiagnosticsContext; +import com.azure.cosmos.CosmosDiagnosticsRequestInfo; import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosNettyLeakDetectorFactory; @@ -100,6 +103,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -116,6 +120,7 @@ public abstract class TestSuiteBase extends CosmosAsyncClientTest { + protected static final String THIN_CLIENT_ENDPOINT_INDICATOR = ":10250/"; private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 5; private static final ObjectMapper objectMapper = new ObjectMapper(); private static final CosmosItemRequestOptions DEFAULT_DELETE_ITEM_OPTIONS = new CosmosItemRequestOptions() @@ -2004,6 +2009,34 @@ protected static void safeDeleteCollection(AsyncDocumentClient client, String da } } + protected static void assertThinClientEndpointUsed(CosmosDiagnostics diagnostics) { + assertThat(diagnostics).isNotNull(); + + CosmosDiagnosticsContext ctx = diagnostics.getDiagnosticsContext(); + assertThat(ctx).isNotNull(); + + assertThinClientEndpointUsed(ctx); + } + + protected static void assertThinClientEndpointUsed(CosmosDiagnosticsContext ctx) { + assertThat(ctx).isNotNull(); + + Collection requests = ctx.getRequestInfo(); + assertThat(requests).isNotNull(); + assertThat(requests.size()).isPositive(); + + for (CosmosDiagnosticsRequestInfo requestInfo : requests) { + if (requestInfo.getEndpoint() != null + && requestInfo.getEndpoint().contains(THIN_CLIENT_ENDPOINT_INDICATOR)) { + return; + } + } + + assertThat(false) + .as("No request targeting thin client proxy endpoint (" + THIN_CLIENT_ENDPOINT_INDICATOR + ")") + .isTrue(); + } + protected static void safeClose(AsyncDocumentClient client) { if (client != null) { try { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index f2bac940b448..a513d4ae84c1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -1996,12 +1996,12 @@ public void incrementalChangeFeedModeToFullFidelityChangeFeedMode(boolean isStar if (leaseDocumentsWithNonNullContinuationToken > 0) { assertThrows(IllegalStateException.class, () -> fullFidelityChangeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .timeout(Duration.ofMillis(10 * CHANGE_FEED_PROCESSOR_TIMEOUT)) .block()); } else { fullFidelityChangeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .timeout(Duration.ofMillis(10 * CHANGE_FEED_PROCESSOR_TIMEOUT)) .doOnSuccess(ignore -> logger.info("Started FULL_FIDELITY ChangeFeedProcessor successfully!")) .block(); @@ -2201,12 +2201,8 @@ void validateChangeFeedProcessing( throw ex; } - // Poll until all documents are received instead of sleeping the full duration. - // This returns as soon as documents arrive, saving significant time in CI. - long deadline = System.currentTimeMillis() + sleepTime; - while (System.currentTimeMillis() < deadline && receivedDocuments.size() < createdDocuments.size()) { - Thread.sleep(100); - } + // Wait for the feed processor to receive and process the documents. + Thread.sleep(sleepTime); assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); @@ -2228,7 +2224,11 @@ void validateChangeFeedProcessing( assertThat(item.getHostName()).isEqualTo(hostName).as("Change Feed Processor ownership"); } - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); + // Block on stop to ensure the processor fully releases leases before the caller + // starts another processor on the same lease container (e.g. full fidelity CFP). + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .block(); for (InternalObjectNode item : createdDocuments) { assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index cd85fb9aaafe..06768eacbccd 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Bugs Fixed * Fixed Remote Code Execution (RCE) vulnerability (CWE-502) by replacing Java deserialization with JSON-based serialization in `CosmosClientMetadataCachesSnapshot`, `AsyncCache`, and `DocumentCollection`. The metadata cache snapshot now uses Jackson for serialization/deserialization, eliminating the entire class of Java deserialization attacks. - [PR 47971](https://github.com/Azure/azure-sdk-for-java/pull/47971) +* Fixed availability strategy for Gateway V2 (thin client) by ensuring `RegionalRoutingContext` identity is based only on the immutable gateway endpoint. - See [PR 48432](https://github.com/Azure/azure-sdk-for-java/pull/48432) #### Other Changes * Added aggressive HTTP timeout policies for document operations routed to Gateway V2. - [PR 47879](https://github.com/Azure/azure-sdk-for-java/pull/47879) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index e2b18537c7ac..3e4cd5547a65 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1064,6 +1064,7 @@ public RxDocumentServiceRequest clone() { rxDocumentServiceRequest.resourceId = this.resourceId; rxDocumentServiceRequest.hasFeedRangeFilteringBeenApplied = this.hasFeedRangeFilteringBeenApplied; rxDocumentServiceRequest.isPerPartitionAutomaticFailoverEnabledAndWriteRequest = this.isPerPartitionAutomaticFailoverEnabledAndWriteRequest; + rxDocumentServiceRequest.partitionKeyDefinition = this.partitionKeyDefinition; return rxDocumentServiceRequest; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java index 92d1c197525e..7a7025ecdd1b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ThinClientStoreModel.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; +import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdConstants; @@ -12,6 +13,7 @@ import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; +import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.routing.HexConvert; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import io.netty.buffer.ByteBuf; @@ -184,6 +186,36 @@ protected boolean partitionKeyRangeResolutionNeeded(RxDocumentServiceRequest req && request.requestContext.resolvedPartitionKeyRange == null && request.getPartitionKeyRangeIdentity() != null; } + + @Override + public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { + // Ensure partitionKeyDefinition is resolved from the collection cache before + // reaching wrapInHttpRequest, which needs it for client-side EPK computation. + // This handles cases where clone() or other code paths didn't propagate partitionKeyDefinition. + if (request.getPartitionKeyInternal() != null && request.getPartitionKeyDefinition() == null) { + RxClientCollectionCache cache = this.getCollectionCache(); + if (cache != null) { + return cache + .resolveCollectionAsync( + BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), + request) + .flatMap(collectionHolder -> { + if (collectionHolder.v != null) { + request.setPartitionKeyDefinition(collectionHolder.v.getPartitionKey()); + } else { + throw new NullPointerException( + "Collection cache returned null for request to " + + request.getResourceAddress() + + ". Cannot resolve partitionKeyDefinition for client-side EPK computation."); + } + return super.performRequestInternal(request, requestUri); + }); + } + } + + return super.performRequestInternal(request, requestUri); + } + @Override public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { if (this.globalDatabaseAccountName == null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java index ec185ed87be7..9d1a46c7a54a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/RegionalRoutingContext.java @@ -36,29 +36,25 @@ public URI getThinclientRegionalEndpoint() { return this.thinclientRegionalEndpoint; } + // equals and hashCode should only take dependency on gatewayRegionalEndpoint + // because map lookups are done on RegionalRoutingContext with only the gateway regional endpoint. + // toString includes both endpoints for diagnostic visibility. @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RegionalRoutingContext that = (RegionalRoutingContext) o; - if (this.thinclientRegionalEndpoint != null) { - return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint) && - this.thinclientRegionalEndpoint.equals(that.thinclientRegionalEndpoint); - } else { - return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint); - } + return this.gatewayRegionalEndpoint.equals(that.gatewayRegionalEndpoint); } @Override public int hashCode() { - return Objects.hash(this.gatewayRegionalEndpoint, this.thinclientRegionalEndpoint); + return Objects.hash(this.gatewayRegionalEndpoint); } @Override public String toString() { - return "RegionalRoutingContext{" + - "gatewayRegionalEndpoint=" + gatewayRegionalEndpointAsString + - ", thinclientRegionalEndpoint=" + thinclientRegionalEndpointAsString + - '}'; + return "RegionalRoutingContext{gw=" + gatewayRegionalEndpointAsString + + ", tc=" + thinclientRegionalEndpointAsString + "}"; } } diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart1_round1_writes.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart1_round1_writes.png new file mode 100644 index 000000000000..8f0910481686 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart1_round1_writes.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart2_round1_reads.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart2_round1_reads.png new file mode 100644 index 000000000000..c5792545529c Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart2_round1_reads.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart3_round2_all.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart3_round2_all.png new file mode 100644 index 000000000000..ad92dd0f6cf6 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart3_round2_all.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart4_errors.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart4_errors.png new file mode 100644 index 000000000000..9b29309bf5cb Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart4_errors.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_overview.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_overview.png new file mode 100644 index 000000000000..ef6a4424ba8c Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_overview.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_successes.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_successes.png new file mode 100644 index 000000000000..7fc4b25e06f5 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_successes.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_write_successes.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_write_successes.png new file mode 100644 index 000000000000..26edb723fd35 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart5_write_successes.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_read_successes.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_read_successes.png new file mode 100644 index 000000000000..ffaa76e00c33 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_read_successes.png differ diff --git a/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_successes.png b/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_successes.png new file mode 100644 index 000000000000..4088f539e1d3 Binary files /dev/null and b/sdk/cosmos/docs/dr-drill-2026-03-19/chart6_successes.png differ diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index 69d782fcc9a0..92bbc0347acb 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -129,7 +129,7 @@ extends: TestResultsFiles: '**/junitreports/TEST-*.xml' AdditionalVariables: - name: AdditionalArgs - value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-region-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-region-session-key) -DCOSMOS.THINCLIENT_ENABLED=true' + value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-region-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-region-session-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true' - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: @@ -161,7 +161,7 @@ extends: TestResultsFiles: '**/junitreports/TEST-*.xml' AdditionalVariables: - name: AdditionalArgs - value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true' + value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true' - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: