diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java index 2bbb3ee99338..3c4c9f3202e3 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerOpenConnectionsAndInitCachesTest.java @@ -130,7 +130,9 @@ public void openConnectionsAndInitCachesForDirectMode(boolean useAsync) throws I assertThat(provider.count()).isEqualTo(0); assertThat(collectionInfoByNameMap.size()).isEqualTo(0); - assertThat(routingMap.size()).isEqualTo(0); + // The partition-key-range (routing map) cache is shared across clients targeting the same + // service endpoint, so its size reflects every container routed to in this JVM. Assert on this + // container's own entry (populated by openConnectionsAndInitCaches) instead of the total size. assertThat(ReflectionUtils.isInitialized(asyncContainer).get()).isFalse(); // Calling it twice to make sure no side effect of second time no-op call @@ -143,7 +145,8 @@ public void openConnectionsAndInitCachesForDirectMode(boolean useAsync) throws I } assertThat(collectionInfoByNameMap.size()).isEqualTo(1); - assertThat(routingMap.size()).isEqualTo(1); + String collectionRid = asyncContainer.read().block().getProperties().getResourceId(); + assertThat(routingMap).containsKey(collectionRid); assertThat(ReflectionUtils.isInitialized(asyncContainer).get()).isTrue(); GlobalAddressResolver globalAddressResolver = ReflectionUtils.getGlobalAddressResolver(rxDocumentClient); @@ -214,11 +217,12 @@ public void openConnectionsAndInitCachesForGatewayMode(boolean useAsync) { RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) asyncClient.getDocClientWrapper(); - ConcurrentHashMap routingMap = getRoutingMap(rxDocumentClient); ConcurrentHashMap collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient); assertThat(collectionInfoByNameMap.size()).isEqualTo(0); - assertThat(routingMap.size()).isEqualTo(0); + // The routing-map (partition-key-range) cache is shared per service endpoint; gateway-mode + // openConnectionsAndInitCaches does not populate it, but a sibling direct-mode test may have, + // so the shared routing map's size is not asserted here. assertThat(ReflectionUtils.isInitialized(asyncContainer).get()).isFalse(); // Verifying no error when initializeContainer called on gateway mode @@ -232,7 +236,6 @@ public void openConnectionsAndInitCachesForGatewayMode(boolean useAsync) { } assertThat(collectionInfoByNameMap.size()).isEqualTo(0); - assertThat(routingMap.size()).isEqualTo(0); assertThat(ReflectionUtils.isInitialized(asyncContainer).get()).isTrue(); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/SharedPartitionKeyRangeCacheE2ETest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/SharedPartitionKeyRangeCacheE2ETest.java new file mode 100644 index 000000000000..b867c8cc3238 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/SharedPartitionKeyRangeCacheE2ETest.java @@ -0,0 +1,200 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos; + +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.caches.AsyncCacheNonBlocking; +import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; +import com.azure.cosmos.implementation.caches.SharedPartitionKeyRangeCacheRegistry; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.implementation.routing.CollectionRoutingMap; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.rx.TestSuiteBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.net.URI; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * End-to-end tests for {@link SharedPartitionKeyRangeCacheRegistry}: spin up real + * {@link CosmosAsyncClient} instances, perform partition-key-routed operations to + * populate the routing-map cache, and verify the registry's sharing semantics. + * + *

Sharing is keyed by the service endpoint {@link URI} configured on + * {@link CosmosClientBuilder}. Two clients configured with the same endpoint + * URI share the cache; clients configured with different endpoint URIs (e.g. + * the global endpoint vs a regional endpoint of the same logical account) do + * not share — see {@link SharedPartitionKeyRangeCacheRegistry} javadoc + * for the rationale.

+ */ +public class SharedPartitionKeyRangeCacheE2ETest extends TestSuiteBase { + private static final Logger logger = LoggerFactory.getLogger(SharedPartitionKeyRangeCacheE2ETest.class); + + private static final int TIMEOUT = 90_000; + private static final int SETUP_TIMEOUT = 60_000; + private static final int SHUTDOWN_TIMEOUT = 30_000; + + private CosmosAsyncClient setupClient; + private CosmosAsyncDatabase database; + private CosmosAsyncContainer container; + private URI serviceEndpoint; + + @Factory(dataProvider = "simpleGatewayClient") + public SharedPartitionKeyRangeCacheE2ETest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"emulator", "fast"}, timeOut = SETUP_TIMEOUT) + public void before() { + this.setupClient = getClientBuilder().buildAsyncClient(); + this.database = getSharedCosmosDatabase(this.setupClient); + + String containerId = "pkr-share-e2e-" + UUID.randomUUID(); + CosmosContainerProperties properties = + new CosmosContainerProperties(containerId, "/mypk"); + this.database + .createContainer(properties, ThroughputProperties.createManualThroughput(400)) + .block(); + this.container = this.database.getContainer(containerId); + + this.serviceEndpoint = serviceEndpointOf(this.setupClient); + assertThat(this.serviceEndpoint) + .as("service endpoint must be available after client init") + .isNotNull(); + } + + @AfterClass(groups = {"emulator", "fast"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void after() { + if (this.container != null) { + try { + this.container.delete().block(); + } catch (Exception e) { + logger.warn("Failed to delete e2e container", e); + } + } + safeClose(this.setupClient); + } + + /** + * Two {@link CosmosAsyncClient} instances configured with the same service + * endpoint must share the underlying {@link AsyncCacheNonBlocking} routing-map + * storage, and the registry refcount must reflect both holders. + */ + @Test(groups = {"emulator", "fast"}, timeOut = TIMEOUT) + public void twoClientsOnSameEndpointShareRoutingMapStorage() { + CosmosAsyncClient clientA = null; + CosmosAsyncClient clientB = null; + try { + clientA = getClientBuilder().buildAsyncClient(); + clientB = getClientBuilder().buildAsyncClient(); + + // Trigger PK-routed operations on both clients so the routing-map cache populates. + TestObject seed = TestObject.create(); + createItem(clientA, seed); + readItemSilently(clientA, seed.getMypk()); + readItemSilently(clientB, seed.getMypk()); + + AsyncCacheNonBlocking storageA = routingMapStorageOf(clientA); + AsyncCacheNonBlocking storageB = routingMapStorageOf(clientB); + + assertThat(storageA) + .as("Two CosmosAsyncClients on the same endpoint must share the routing-map AsyncCacheNonBlocking instance") + .isSameAs(storageB); + + int refCount = registryReferenceCount(this.serviceEndpoint); + assertThat(refCount) + .as("Registry refcount for endpoint [%s] must include both clients", this.serviceEndpoint) + .isGreaterThanOrEqualTo(2); + + ConcurrentHashMap values = + ReflectionUtils.getValueMapNonBlockingCache(storageA); + assertThat(values) + .as("Routing-map cache must contain at least one entry after PK-routed reads") + .isNotEmpty(); + } finally { + int refCountBeforeClose = registryReferenceCount(this.serviceEndpoint); + safeClose(clientA); + int refCountAfterFirstClose = registryReferenceCount(this.serviceEndpoint); + assertThat(refCountAfterFirstClose) + .as("Closing one client must drop the registry refcount by exactly one") + .isEqualTo(refCountBeforeClose - 1); + + safeClose(clientB); + int refCountAfterSecondClose = registryReferenceCount(this.serviceEndpoint); + assertThat(refCountAfterSecondClose) + .as("Closing both test clients must drop refcount by two (setup client may still hold a reference)") + .isEqualTo(refCountBeforeClose - 2); + } + } + + // --- helpers ---------------------------------------------------------------- + + private void createItem(CosmosAsyncClient client, TestObject item) { + CosmosAsyncContainer c = client + .getDatabase(this.database.getId()) + .getContainer(this.container.getId()); + c.createItem(item, new PartitionKey(item.getMypk()), new CosmosItemRequestOptions()).block(); + } + + private void readItemSilently(CosmosAsyncClient client, String pk) { + // The cache is populated by the resolve step regardless of whether the doc exists; + // we issue a random-id read and tolerate 404. + CosmosAsyncContainer c = client + .getDatabase(this.database.getId()) + .getContainer(this.container.getId()); + try { + CosmosItemResponse resp = c.readItem( + UUID.randomUUID().toString(), + new PartitionKey(pk), + new CosmosItemRequestOptions(), + TestObject.class).block(); + assertThat(resp).isNotNull(); + } catch (CosmosException ex) { + if (ex.getStatusCode() != 404) { + throw ex; + } + } + } + + private static AsyncCacheNonBlocking routingMapStorageOf(CosmosAsyncClient client) { + RxDocumentClientImpl rxDocumentClient = + (RxDocumentClientImpl) CosmosBridgeInternal.getAsyncDocumentClient(client); + RxPartitionKeyRangeCache partitionKeyRangeCache = rxDocumentClient.getPartitionKeyRangeCache(); + return ReflectionUtils.getRoutingMapAsyncCacheNonBlocking(partitionKeyRangeCache); + } + + private static URI serviceEndpointOf(CosmosAsyncClient client) { + RxDocumentClientImpl rxDocumentClient = + (RxDocumentClientImpl) CosmosBridgeInternal.getAsyncDocumentClient(client); + return rxDocumentClient.getServiceEndpoint(); + } + + /** + * The registry's {@code referenceCount} accessor is package-private (test-only). + * Reflect into it from this package; widening visibility for a test-only check + * would pollute the implementation class's surface. + */ + private static int registryReferenceCount(URI endpoint) { + try { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + Method m = SharedPartitionKeyRangeCacheRegistry.class.getDeclaredMethod("referenceCount", URI.class); + m.setAccessible(true); + return (Integer) m.invoke(registry, endpoint); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to reflect SharedPartitionKeyRangeCacheRegistry.referenceCount", e); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionMetadataRequestRuleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionMetadataRequestRuleTests.java index e7ed9ff15a69..92f477c8d422 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionMetadataRequestRuleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionMetadataRequestRuleTests.java @@ -610,25 +610,32 @@ public void faultInjectionServerErrorRuleTests_PartitionKeyRanges_DelayError( assertThat(metadataDiagnosticList.size()).isGreaterThan(0); - JsonNode pkRangesLookup = null; + // PARTITION_KEY_RANGE_LOOK_UP is recorded once per cache lookup (cache hits and the + // actual /pkranges fetch alike), so several entries can be present. The injected delay + // only applies to the real network fetch, so pick the entry with the largest duration. + boolean pkRangesLookupFound = false; + long maxPkRangesLookupDurationInMs = -1; for (int i = 0; i < metadataDiagnosticList.size(); i++) { - if (metadataDiagnosticList - .get(i) + JsonNode metadataDiagnostic = metadataDiagnosticList.get(i); + if (metadataDiagnostic .get("metaDataName") .asText() .equalsIgnoreCase(MetadataDiagnosticsContext.MetadataType.PARTITION_KEY_RANGE_LOOK_UP.name())) { - pkRangesLookup = metadataDiagnosticList.get(i); - break; + pkRangesLookupFound = true; + maxPkRangesLookupDurationInMs = + Math.max(maxPkRangesLookupDurationInMs, metadataDiagnostic.get("durationinMS").asLong()); } } - assertThat(pkRangesLookup).isNotNull(); + assertThat(pkRangesLookupFound) + .as("At least one PARTITION_KEY_RANGE_LOOK_UP diagnostic must be present") + .isTrue(); if (faultInjectionServerErrorType == FaultInjectionServerErrorType.CONNECTION_DELAY) { - assertThat(pkRangesLookup.get("durationinMS").asLong()).isGreaterThanOrEqualTo(45 * 1000 * Math.min(applyLimit, 3)); // the duration will be at least one connection timeout + assertThat(maxPkRangesLookupDurationInMs).isGreaterThanOrEqualTo(45 * 1000 * Math.min(applyLimit, 3)); // the duration will be at least one connection timeout } if (faultInjectionServerErrorType == FaultInjectionServerErrorType.RESPONSE_DELAY) { - assertThat(pkRangesLookup.get("durationinMS").asLong()).isGreaterThanOrEqualTo(500 * Math.min(applyLimit, 3)); // the duration will be at least one response timeout + assertThat(maxPkRangesLookupDurationInMs).isGreaterThanOrEqualTo(500 * Math.min(applyLimit, 3)); // the duration will be at least one response timeout } } catch (CosmosException cosmosException) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCacheTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCacheTest.java index ed0b4491dbd7..cf9aeb7642e2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCacheTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCacheTest.java @@ -20,10 +20,13 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -35,12 +38,14 @@ public class RxPartitionKeyRangeCacheTest { private RxDocumentClientImpl client; private RxCollectionCache collectionCache; private RxPartitionKeyRangeCache cache; - + @BeforeMethod(groups = "unit") public void before_test() { client = Mockito.mock(RxDocumentClientImpl.class); collectionCache = Mockito.mock(RxCollectionCache.class); - cache = new RxPartitionKeyRangeCache(client, collectionCache); + // Pass null endpoint explicitly to opt into an isolated cache for the + // original test-suite behavior (these tests don't exercise sharing). + cache = new RxPartitionKeyRangeCache(client, collectionCache, (URI) null); } @Test(groups = "unit") @@ -245,4 +250,268 @@ public void tryLookupAsync_RetriesOnceAndConvertsToNotFoundException() { .expectNextMatches(s -> s.v == null) .verifyComplete(); } + + @Test(groups = "unit") + public void twoCachesForSameEndpointShareRoutingMapStorage() throws Exception { + URI endpoint = URI.create("https://test-shared-pkr-1.documents.azure.com:443/"); + + RxDocumentClientImpl clientA = Mockito.mock(RxDocumentClientImpl.class); + RxDocumentClientImpl clientB = Mockito.mock(RxDocumentClientImpl.class); + RxCollectionCache collA = Mockito.mock(RxCollectionCache.class); + RxCollectionCache collB = Mockito.mock(RxCollectionCache.class); + + String collectionRid = "shared-coll-1"; + DocumentCollection collection = new DocumentCollection(); + collection.setResourceId(collectionRid); + collection.setSelfLink("dbs/db1/colls/coll1"); + + PartitionKeyRange range = new PartitionKeyRange(); + range.setId("0"); + range.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY); + range.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY); + + FeedResponse response = Mockito.mock(FeedResponse.class); + when(response.getResults()).thenReturn(Arrays.asList(range)); + when(response.getContinuationToken()).thenReturn("etag-1"); + + AtomicInteger clientACalls = new AtomicInteger(); + AtomicInteger clientBCalls = new AtomicInteger(); + + when(collA.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + when(clientA.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + clientACalls.incrementAndGet(); + return Flux.just(response); + }); + + when(collB.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + when(clientB.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + clientBCalls.incrementAndGet(); + return Flux.just(response); + }); + + RxPartitionKeyRangeCache cacheA = new RxPartitionKeyRangeCache(clientA, collA, endpoint); + RxPartitionKeyRangeCache cacheB = new RxPartitionKeyRangeCache(clientB, collB, endpoint); + + try { + StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .expectNextMatches(v -> v != null && v.v != null) + .verifyComplete(); + + StepVerifier.create(cacheB.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .expectNextMatches(v -> v != null && v.v != null) + .verifyComplete(); + + assertThat(clientACalls.get()).isEqualTo(1); + assertThat(clientBCalls.get()).isZero(); + } finally { + cacheA.close(); + cacheB.close(); + } + + assertThat(SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint)) + .as("close() releases the shared cache reference") + .isZero(); + } + + @Test(groups = "unit") + public void cachesForDifferentEndpointsDoNotShareStorage() throws Exception { + URI endpointA = URI.create("https://test-shared-pkr-2a.documents.azure.com:443/"); + URI endpointB = URI.create("https://test-shared-pkr-2b.documents.azure.com:443/"); + + RxDocumentClientImpl clientA = Mockito.mock(RxDocumentClientImpl.class); + RxDocumentClientImpl clientB = Mockito.mock(RxDocumentClientImpl.class); + RxCollectionCache collA = Mockito.mock(RxCollectionCache.class); + RxCollectionCache collB = Mockito.mock(RxCollectionCache.class); + + String collectionRid = "shared-coll-2"; + DocumentCollection collection = new DocumentCollection(); + collection.setResourceId(collectionRid); + collection.setSelfLink("dbs/db1/colls/coll2"); + + PartitionKeyRange range = new PartitionKeyRange(); + range.setId("0"); + range.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY); + range.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY); + + FeedResponse response = Mockito.mock(FeedResponse.class); + when(response.getResults()).thenReturn(Arrays.asList(range)); + when(response.getContinuationToken()).thenReturn("etag-2"); + + AtomicInteger clientACalls = new AtomicInteger(); + AtomicInteger clientBCalls = new AtomicInteger(); + + when(collA.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + when(clientA.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + clientACalls.incrementAndGet(); + return Flux.just(response); + }); + when(collB.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + when(clientB.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + clientBCalls.incrementAndGet(); + return Flux.just(response); + }); + + RxPartitionKeyRangeCache cacheA = new RxPartitionKeyRangeCache(clientA, collA, endpointA); + RxPartitionKeyRangeCache cacheB = new RxPartitionKeyRangeCache(clientB, collB, endpointB); + + try { + StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .expectNextMatches(v -> v != null && v.v != null) + .verifyComplete(); + + StepVerifier.create(cacheB.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .expectNextMatches(v -> v != null && v.v != null) + .verifyComplete(); + + assertThat(clientACalls.get()).isEqualTo(1); + assertThat(clientBCalls.get()).isEqualTo(1); + } finally { + cacheA.close(); + cacheB.close(); + } + } + + @Test(groups = "unit") + public void closeIsIdempotent() throws Exception { + URI endpoint = URI.create("https://test-shared-pkr-3.documents.azure.com:443/"); + RxDocumentClientImpl mockClient = Mockito.mock(RxDocumentClientImpl.class); + RxCollectionCache mockColl = Mockito.mock(RxCollectionCache.class); + + RxPartitionKeyRangeCache c = new RxPartitionKeyRangeCache(mockClient, mockColl, endpoint); + assertThat(SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint)) + .isEqualTo(1); + + c.close(); + c.close(); // second call must be a no-op + c.close(); + + assertThat(SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint)) + .as("repeated close() must not drive refcount negative") + .isZero(); + } + + @Test(groups = "unit") + public void clientWithServiceEndpointAcquiresAndReleasesRegistryRefcount() throws Exception { + // Regression-guard for the RxDocumentClientImpl.close() -> partitionKeyRangeCache.close() + // wiring: constructing the cache must bump the registry refcount; close() must drop it. + URI endpoint = URI.create("https://test-pkr-lifecycle.documents.azure.com:443/"); + RxDocumentClientImpl mockClient = Mockito.mock(RxDocumentClientImpl.class); + RxCollectionCache mockColl = Mockito.mock(RxCollectionCache.class); + + int before = SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint); + + // Mirrors what RxDocumentClientImpl actually uses: 3-arg ctor with the account id. + RxPartitionKeyRangeCache c = new RxPartitionKeyRangeCache(mockClient, mockColl, endpoint); + try { + assertThat(SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint)) + .isEqualTo(before + 1); + } finally { + c.close(); + } + assertThat(SharedPartitionKeyRangeCacheRegistry.getInstance().referenceCount(endpoint)) + .isEqualTo(before); + } + + @Test(groups = "unit") + public void forceRefreshOnSharedCacheIsVisibleToSiblingClient() throws Exception { + // Cross-client invalidation propagation: client A force-refreshes a routing map, + // the new value must be visible to client B's next lookup. + URI endpoint = URI.create("https://test-shared-pkr-refresh.documents.azure.com:443/"); + + RxDocumentClientImpl clientA = Mockito.mock(RxDocumentClientImpl.class); + RxDocumentClientImpl clientB = Mockito.mock(RxDocumentClientImpl.class); + RxCollectionCache collA = Mockito.mock(RxCollectionCache.class); + RxCollectionCache collB = Mockito.mock(RxCollectionCache.class); + + String collectionRid = "refresh-coll-1"; + DocumentCollection collection = new DocumentCollection(); + collection.setResourceId(collectionRid); + collection.setSelfLink("dbs/db1/colls/coll1"); + + PartitionKeyRange rangeBefore = new PartitionKeyRange(); + rangeBefore.setId("0"); + rangeBefore.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY); + rangeBefore.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY); + + // After refresh: a split scenario produces two child ranges with the original as parent. + PartitionKeyRange rangeAfter1 = new PartitionKeyRange(); + rangeAfter1.setId("1"); + rangeAfter1.setMinInclusive(PartitionKeyRange.MINIMUM_INCLUSIVE_EFFECTIVE_PARTITION_KEY); + rangeAfter1.setMaxExclusive("80"); + rangeAfter1.setParents(Arrays.asList("0")); + PartitionKeyRange rangeAfter2 = new PartitionKeyRange(); + rangeAfter2.setId("2"); + rangeAfter2.setMinInclusive("80"); + rangeAfter2.setMaxExclusive(PartitionKeyRange.MAXIMUM_EXCLUSIVE_EFFECTIVE_PARTITION_KEY); + rangeAfter2.setParents(Arrays.asList("0")); + + FeedResponse responseBefore = Mockito.mock(FeedResponse.class); + when(responseBefore.getResults()).thenReturn(Arrays.asList(rangeBefore)); + when(responseBefore.getContinuationToken()).thenReturn("etag-before"); + + FeedResponse responseAfter = Mockito.mock(FeedResponse.class); + when(responseAfter.getResults()).thenReturn(Arrays.asList(rangeAfter1, rangeAfter2)); + when(responseAfter.getContinuationToken()).thenReturn("etag-after"); + + when(collA.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + when(collB.resolveCollectionAsync(any(), any())) + .thenReturn(Mono.just(new Utils.ValueHolder<>(collection))); + + // Client A first returns the pre-split layout, then the post-split layout on refresh. + AtomicInteger clientACalls = new AtomicInteger(); + when(clientA.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + int n = clientACalls.incrementAndGet(); + return n == 1 ? Flux.just(responseBefore) : Flux.just(responseAfter); + }); + AtomicInteger clientBCalls = new AtomicInteger(); + when(clientB.readPartitionKeyRanges(eq(collection.getSelfLink()), any(CosmosQueryRequestOptions.class))) + .thenAnswer(invocation -> { + clientBCalls.incrementAndGet(); + return Flux.just(responseAfter); + }); + + RxPartitionKeyRangeCache cacheA = new RxPartitionKeyRangeCache(clientA, collA, endpoint); + RxPartitionKeyRangeCache cacheB = new RxPartitionKeyRangeCache(clientB, collB, endpoint); + + try { + // Step 1: A populates the shared cache with the pre-split routing map. + CollectionRoutingMap[] beforeMapHolder = new CollectionRoutingMap[1]; + StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .consumeNextWith(v -> beforeMapHolder[0] = v.v) + .verifyComplete(); + assertThat(beforeMapHolder[0]).isNotNull(); + + // Step 2: A force-refreshes (passing previousValue == current cached map). + CollectionRoutingMap[] afterMapHolder = new CollectionRoutingMap[1]; + StepVerifier.create(cacheA.tryLookupAsync(null, collectionRid, beforeMapHolder[0], new HashMap<>())) + .consumeNextWith(v -> afterMapHolder[0] = v.v) + .verifyComplete(); + assertThat(afterMapHolder[0]).isNotSameAs(beforeMapHolder[0]); + + // Step 3: B's lookup must see A's refreshed value (no fresh fetch from B). + StepVerifier.create(cacheB.tryLookupAsync(null, collectionRid, null, new HashMap<>())) + .consumeNextWith(v -> assertThat(v.v).isSameAs(afterMapHolder[0])) + .verifyComplete(); + + assertThat(clientACalls.get()) + .as("A populated then refreshed -> 2 calls") + .isEqualTo(2); + assertThat(clientBCalls.get()) + .as("B must observe A's refresh without issuing its own fetch") + .isZero(); + } finally { + cacheA.close(); + cacheB.close(); + } + } } \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistryTest.java new file mode 100644 index 000000000000..8a5e89563bac --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistryTest.java @@ -0,0 +1,328 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.caches; + +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.caches.SharedPartitionKeyRangeCacheRegistry.AcquireResult; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link SharedPartitionKeyRangeCacheRegistry}. Each test uses a unique endpoint + * and releases every reference, since the registry is a process-wide singleton. */ +public class SharedPartitionKeyRangeCacheRegistryTest { + + private static final String ENABLE_FLAG = "COSMOS.SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED"; + + private String savedFlag; + + @BeforeMethod(groups = "unit") + public void before() { + savedFlag = System.getProperty(ENABLE_FLAG); + System.clearProperty(ENABLE_FLAG); // default is enabled + } + + @AfterMethod(groups = "unit") + public void after() { + if (savedFlag == null) { + System.clearProperty(ENABLE_FLAG); + } else { + System.setProperty(ENABLE_FLAG, savedFlag); + } + } + + @Test(groups = "unit") + public void acquireReturnsSameInstanceForSameEndpoint() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-1.documents.azure.com:443/"); + + AcquireResult ra = registry.acquire(endpoint, new Object()); + AcquireResult rb = registry.acquire(endpoint, new Object()); + + try { + assertThat(ra.cache).isSameAs(rb.cache); + assertThat(registry.referenceCount(endpoint)).isEqualTo(2); + } finally { + registry.release(endpoint, ra.cache, ra.releaseHandle); + registry.release(endpoint, rb.cache, rb.releaseHandle); + } + assertThat(registry.referenceCount(endpoint)).isZero(); + } + + @Test(groups = "unit") + public void acquireReturnsDifferentInstanceForDifferentEndpoints() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI e1 = URI.create("https://test-acct-share-2a.documents.azure.com:443/"); + URI e2 = URI.create("https://test-acct-share-2b.documents.azure.com:443/"); + + AcquireResult ra = registry.acquire(e1, new Object()); + AcquireResult rb = registry.acquire(e2, new Object()); + + try { + assertThat(ra.cache).isNotSameAs(rb.cache); + } finally { + registry.release(e1, ra.cache, ra.releaseHandle); + registry.release(e2, rb.cache, rb.releaseHandle); + } + } + + @Test(groups = "unit") + public void acquireTreatsHostCaseInsensitivelyMatchingUriEquals() { + // URI.equals is case-insensitive on host (RFC 3986); confirm clients with + // mixed-case host names collapse into one shared entry. + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI lower = URI.create("https://test-acct-share-case.documents.azure.com:443/"); + URI mixed = URI.create("https://Test-Acct-Share-Case.documents.azure.com:443/"); + + assertThat(lower).isEqualTo(mixed); + + AcquireResult ra = registry.acquire(lower, new Object()); + AcquireResult rb = registry.acquire(mixed, new Object()); + + try { + assertThat(ra.cache) + .as("lower-case and mixed-case host must share the same registry entry") + .isSameAs(rb.cache); + assertThat(registry.referenceCount(lower)).isEqualTo(2); + assertThat(registry.referenceCount(mixed)).isEqualTo(2); + } finally { + registry.release(lower, ra.cache, ra.releaseHandle); + registry.release(mixed, rb.cache, rb.releaseHandle); + } + assertThat(registry.referenceCount(lower)).isZero(); + } + + @Test(groups = "unit") + public void regionalAndGlobalEndpointsDoNotShareStorage() { + // Documents the chosen scope: the registry keys on the URI as configured on the + // CosmosClientBuilder. A client configured with the global endpoint and a client + // configured with a regional endpoint of the same logical account use distinct + // entries. Sharing across endpoints would require canonicalising the account id + // returned from regional endpoints (which embeds a service-normalised region + // suffix that cannot be reliably reversed) and is deliberately not attempted. + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI global = URI.create("https://contoso.documents.azure.com:443/"); + URI regional = URI.create("https://contoso-westus.documents.azure.com:443/"); + + AcquireResult ra = registry.acquire(global, new Object()); + AcquireResult rb = registry.acquire(regional, new Object()); + + try { + assertThat(ra.cache) + .as("global and regional endpoint URIs use separate registry entries") + .isNotSameAs(rb.cache); + assertThat(registry.referenceCount(global)).isEqualTo(1); + assertThat(registry.referenceCount(regional)).isEqualTo(1); + } finally { + registry.release(global, ra.cache, ra.releaseHandle); + registry.release(regional, rb.cache, rb.releaseHandle); + } + } + + @Test(groups = "unit") + public void releaseEvictsAtZeroRefcount() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-3.documents.azure.com:443/"); + + AcquireResult ra = registry.acquire(endpoint, new Object()); + AcquireResult rb = registry.acquire(endpoint, new Object()); + assertThat(registry.referenceCount(endpoint)).isEqualTo(2); + + registry.release(endpoint, ra.cache, ra.releaseHandle); + assertThat(registry.referenceCount(endpoint)).isEqualTo(1); + + registry.release(endpoint, rb.cache, rb.releaseHandle); + assertThat(registry.referenceCount(endpoint)).isZero(); + + // After eviction, fresh acquire produces a new cache instance. + AcquireResult rc = registry.acquire(endpoint, new Object()); + try { + assertThat(rc.cache).isNotSameAs(ra.cache); + assertThat(registry.referenceCount(endpoint)).isEqualTo(1); + } finally { + registry.release(endpoint, rc.cache, rc.releaseHandle); + } + } + + @Test(groups = "unit") + public void releaseIsIdempotentWhenSuppliedSameCacheRepeatedly() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-4.documents.azure.com:443/"); + + AcquireResult ra = registry.acquire(endpoint, new Object()); + registry.release(endpoint, ra.cache, ra.releaseHandle); + assertThat(registry.referenceCount(endpoint)).isZero(); + + // Releasing a stale cache reference must not crash or drive refcount negative. + registry.release(endpoint, ra.cache, ra.releaseHandle); + assertThat(registry.referenceCount(endpoint)).isZero(); + } + + @Test(groups = "unit") + public void releaseIsNoOpWhenCacheIsNotTheRegisteredInstance() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-5.documents.azure.com:443/"); + + AcquireResult stale = registry.acquire(endpoint, new Object()); + registry.release(endpoint, stale.cache, stale.releaseHandle); + + AcquireResult current = registry.acquire(endpoint, new Object()); + try { + registry.release(endpoint, stale.cache, null); // stale != current → no-op + assertThat(registry.referenceCount(endpoint)).isEqualTo(1); + } finally { + registry.release(endpoint, current.cache, current.releaseHandle); + } + } + + @Test(groups = "unit") + public void nullEndpointReturnsIsolatedCacheAndDoesNotEnterRegistry() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + int before = registry.registeredEndpointCount(); + + AcquireResult ra = registry.acquire(null, new Object()); + AcquireResult rb = registry.acquire(null, new Object()); + + assertThat(ra.cache).isNotSameAs(rb.cache); + assertThat(ra.releaseHandle).isNull(); + assertThat(rb.releaseHandle).isNull(); + assertThat(registry.registeredEndpointCount()).isEqualTo(before); + + registry.release(null, ra.cache, ra.releaseHandle); + registry.release(null, rb.cache, rb.releaseHandle); + } + + @Test(groups = "unit") + public void disabledFlagReturnsIsolatedCachesAndPreservesRegistryEmpty() { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-6.documents.azure.com:443/"); + int before = registry.registeredEndpointCount(); + + System.setProperty(ENABLE_FLAG, "false"); + assertThat(Configs.isSharedPartitionKeyRangeCacheEnabled()).isFalse(); + + AcquireResult ra = registry.acquire(endpoint, new Object()); + AcquireResult rb = registry.acquire(endpoint, new Object()); + + try { + assertThat(ra.cache).isNotSameAs(rb.cache); + assertThat(ra.releaseHandle).isNull(); + assertThat(registry.registeredEndpointCount()).isEqualTo(before); + assertThat(registry.referenceCount(endpoint)).isZero(); + } finally { + registry.release(endpoint, ra.cache, ra.releaseHandle); + registry.release(endpoint, rb.cache, rb.releaseHandle); + } + } + + @Test(groups = "unit") + public void concurrentAcquireAndReleaseProducesConsistentRefcount() throws Exception { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-share-7.documents.azure.com:443/"); + + int threads = 32; + int opsPerThread = 200; + ExecutorService pool = Executors.newFixedThreadPool(threads); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threads); + + try { + for (int t = 0; t < threads; t++) { + pool.submit(() -> { + try { + start.await(); + List held = new ArrayList<>(); + for (int i = 0; i < opsPerThread; i++) { + held.add(registry.acquire(endpoint, new Object())); + if (i % 3 == 0 && !held.isEmpty()) { + AcquireResult r = held.remove(held.size() - 1); + registry.release(endpoint, r.cache, r.releaseHandle); + } + } + for (AcquireResult r : held) { + registry.release(endpoint, r.cache, r.releaseHandle); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + } + start.countDown(); + assertThat(done.await(30, TimeUnit.SECONDS)).isTrue(); + } finally { + pool.shutdownNow(); + } + + assertThat(registry.referenceCount(endpoint)).isZero(); + } + + @Test(groups = "unit") + public void referenceManagerReleasesSharedCacheWhenOwnerIsGarbageCollected() throws Exception { + // Owner is allocated in a separate stack frame so this frame can't keep it alive; + // ReferenceManager runs the cleanup once GC observes the owner as phantom-reachable. + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-leak-1.documents.azure.com:443/"); + + acquireAndLeakOwner(registry, endpoint); + assertThat(registry.referenceCount(endpoint)).isEqualTo(1); + + boolean released = false; + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(15); + while (System.nanoTime() < deadlineNanos) { + System.gc(); + System.runFinalization(); + Thread.sleep(100); + if (registry.referenceCount(endpoint) == 0) { + released = true; + break; + } + } + + assertThat(released) + .as("ReferenceManager should release the shared cache reference after the owner is GC'd " + + "(refcount=%d)", registry.referenceCount(endpoint)) + .isTrue(); + } + + private static void acquireAndLeakOwner(SharedPartitionKeyRangeCacheRegistry registry, URI endpoint) { + Object owner = new Object(); + registry.acquire(endpoint, owner); + // owner falls out of scope on return. + } + + @Test(groups = "unit") + public void promptCloseFulfillsHandleSoReferenceManagerCleanupIsANoop() throws Exception { + SharedPartitionKeyRangeCacheRegistry registry = SharedPartitionKeyRangeCacheRegistry.getInstance(); + URI endpoint = URI.create("https://test-acct-leak-2.documents.azure.com:443/"); + + acquireAndPromptlyClose(registry, endpoint); + assertThat(registry.referenceCount(endpoint)).isZero(); + + // GC + cleanup-action firing must not drive the refcount negative. + for (int i = 0; i < 5; i++) { + System.gc(); + System.runFinalization(); + Thread.sleep(50); + } + assertThat(registry.referenceCount(endpoint)).isZero(); + } + + private static void acquireAndPromptlyClose(SharedPartitionKeyRangeCacheRegistry registry, URI endpoint) { + Object owner = new Object(); + AcquireResult result = registry.acquire(endpoint, owner); + registry.release(endpoint, result.cache, result.releaseHandle); + } +} diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index eee4f87437f9..6b220400e6dc 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Other Changes * Reduced memory footprint of deserialized `PartitionKeyRange` instances by stripping unused fields in the `PartitionKeyRange(ObjectNode)` constructor - See PR [49513](https://github.com/Azure/azure-sdk-for-java/pull/49513). +* Reduced memory footprint and redundant `/pkranges` reads when multiple `CosmosClient` / `CosmosAsyncClient` instances in the same JVM are configured with the same service endpoint. Disable with system property `COSMOS.SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED=false` if needed. ### 4.81.0 (2026-06-08) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index cf1a812e10f2..8e46404d573c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -218,6 +218,12 @@ public class Configs { private static final String USE_LEGACY_TRACING = "COSMOS.USE_LEGACY_TRACING"; private static final boolean DEFAULT_USE_LEGACY_TRACING = false; + // Whether multiple CosmosClient instances configured with the same service + // endpoint share a single partition-key-range cache. Enabled by default. + private static final String SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED = + "COSMOS.SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED"; + private static final boolean DEFAULT_SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED = true; + // whether to enable replica addresses validation private static final String REPLICA_ADDRESS_VALIDATION_ENABLED = "COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED"; private static final boolean DEFAULT_REPLICA_ADDRESS_VALIDATION_ENABLED = true; @@ -1082,6 +1088,12 @@ public static boolean useLegacyTracing() { DEFAULT_USE_LEGACY_TRACING); } + public static boolean isSharedPartitionKeyRangeCacheEnabled() { + return getJVMConfigAsBoolean( + SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED, + DEFAULT_SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED); + } + private static int getJVMConfigAsInt(String propName, int defaultValue) { String propValue = System.getProperty(propName); return getIntValue(propValue, defaultValue); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index d4f99c183c24..ad1840225e5a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -925,7 +925,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func this.resetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy); this.partitionKeyRangeCache = new RxPartitionKeyRangeCache(RxDocumentClientImpl.this, - collectionCache); + collectionCache, this.serviceEndpoint); updateGatewayProxy(); updateThinProxy(); @@ -7465,6 +7465,11 @@ public void close() { this.throughputControlStore.close(); } + if (this.partitionKeyRangeCache != null) { + logger.info("Closing PartitionKeyRangeCache ..."); + LifeCycleUtils.closeQuietly(this.partitionKeyRangeCache); + } + if (this.clientTelemetry != null) { logger.info("Closing ClientTelemetry ..."); this.clientTelemetry.close(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java index b869489c7d75..4943c8311592 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/RxPartitionKeyRangeCache.java @@ -31,39 +31,71 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.io.Closeable; +import java.net.URI; import java.time.Instant; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * While this class is public, but it is not part of our published public APIs. * This is meant to be internally used only by our sdk. + * + *

The routing-map storage is obtained from {@link SharedPartitionKeyRangeCacheRegistry} + * keyed by the service endpoint URI; clients configured with the same service + * endpoint share it. {@link #close()} releases this instance's reference. The + * fetching logic (network call, collection resolution, diagnostics) remains + * per-client. Clients configured with different endpoint URIs — including + * a regional endpoint versus the global endpoint of the same account — + * do not share.

**/ -public class RxPartitionKeyRangeCache implements IPartitionKeyRangeCache { +public class RxPartitionKeyRangeCache implements IPartitionKeyRangeCache, Closeable { private final Logger logger = LoggerFactory.getLogger(RxPartitionKeyRangeCache.class); private final AsyncCacheNonBlocking routingMapCache; private final RxDocumentClientImpl client; private final RxCollectionCache collectionCache; private final DiagnosticsClientContext clientContext; - - public RxPartitionKeyRangeCache(RxDocumentClientImpl client, RxCollectionCache collectionCache) { - this.routingMapCache = new AsyncCacheNonBlocking<>(); + private final URI sharedCacheEndpointKey; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final SharedPartitionKeyRangeCacheRegistry.ReleaseHandle releaseHandle; + + public RxPartitionKeyRangeCache( + RxDocumentClientImpl client, + RxCollectionCache collectionCache, + URI serviceEndpoint) { + + this.sharedCacheEndpointKey = serviceEndpoint; + SharedPartitionKeyRangeCacheRegistry.AcquireResult acquired = + SharedPartitionKeyRangeCacheRegistry.getInstance().acquire(this.sharedCacheEndpointKey, this); + this.routingMapCache = acquired.cache; + this.releaseHandle = acquired.releaseHandle; this.client = client; this.collectionCache = collectionCache; this.clientContext = client; } + /** Idempotent release of this instance's shared-cache reference. */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + SharedPartitionKeyRangeCacheRegistry.getInstance().release( + this.sharedCacheEndpointKey, this.routingMapCache, this.releaseHandle); + } + } + /* (non-Javadoc) * @see IPartitionKeyRangeCache#tryLookupAsync(java.lang.STRING, com.azure.cosmos.internal.routing.CollectionRoutingMap) */ @Override public Mono> tryLookupAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, CollectionRoutingMap previousValue, Map properties) { + Instant lookupStart = Instant.now(); return routingMapCache.getAsync( collectionRid, routingMap -> @@ -74,6 +106,7 @@ public Mono> tryLookupAsync(MetadataDiag properties), currentValue -> shouldForceRefresh(previousValue, currentValue)) .map(Utils.ValueHolder::new) + .doFinally(signal -> recordPartitionKeyRangeLookUp(metaDataDiagnosticsContext, lookupStart)) .onErrorResume(err -> { logger.debug("tryLookupAsync on collectionRid {} encountered failure", collectionRid, err); CosmosException dce = Utils.as(err, CosmosException.class); @@ -176,6 +209,7 @@ public Mono> tryGetPartitionKeyRangeByIdAsy */ @Override public Mono> tryGetRangeByPartitionKeyRangeId(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, String partitionKeyRangeId, Map properties) { + Instant lookupStart = Instant.now(); Mono> routingMapObs = routingMapCache.getAsync( collectionRid, routingMap -> @@ -184,7 +218,9 @@ public Mono> tryGetRangeByPartitionKeyRange collectionRid, null, properties), - forceRefresh -> false).map(Utils.ValueHolder::new); + forceRefresh -> false) + .map(Utils.ValueHolder::new) + .doFinally(signal -> recordPartitionKeyRangeLookUp(metaDataDiagnosticsContext, lookupStart)); return routingMapObs.map(routingMapValueHolder -> new Utils.ValueHolder<>(routingMapValueHolder.v.getRangeByPartitionKeyRangeId(partitionKeyRangeId))) .onErrorResume(err -> { @@ -245,7 +281,6 @@ private Mono getRoutingMapForCollectionAsync( CollectionRoutingMap previousRoutingMap, Map properties) { - Instant pkRangesCallStartTime = Instant.now(); String previousChangeFeedIfNoneMatch = previousRoutingMap == null ? null : previousRoutingMap.getChangeFeedNextIfNoneMatch(); @@ -275,20 +310,23 @@ private Mono getRoutingMapForCollectionAsync( updateRoutingMap(collectionRid, previousRoutingMap, ranges, continuationToken.get()); return Mono.just(updatedMap); - }) - .doFinally(signal -> { - if (metaDataDiagnosticsContext != null) { - Instant pkRangesCallEndTime = Instant.now(); - MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic = - new MetadataDiagnosticsContext.MetadataDiagnostics( - pkRangesCallStartTime, - pkRangesCallEndTime, - MetadataDiagnosticsContext.MetadataType.PARTITION_KEY_RANGE_LOOK_UP); - metaDataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic); - } }); } + private static void recordPartitionKeyRangeLookUp( + MetadataDiagnosticsContext metaDataDiagnosticsContext, + Instant lookupStart) { + if (metaDataDiagnosticsContext == null) { + return; + } + MetadataDiagnosticsContext.MetadataDiagnostics diagnostic = + new MetadataDiagnosticsContext.MetadataDiagnostics( + lookupStart, + Instant.now(), + MetadataDiagnosticsContext.MetadataType.PARTITION_KEY_RANGE_LOOK_UP); + metaDataDiagnosticsContext.addMetaDataDiagnostic(diagnostic); + } + private CollectionRoutingMap updateRoutingMap( String collectionRid, CollectionRoutingMap previousRoutingMap, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistry.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistry.java new file mode 100644 index 000000000000..c25d479092ce --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/caches/SharedPartitionKeyRangeCacheRegistry.java @@ -0,0 +1,172 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.caches; + +import com.azure.core.util.ReferenceManager; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.routing.CollectionRoutingMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Process-wide registry of {@link AsyncCacheNonBlocking} instances holding + * partition-key-range routing maps, keyed by service endpoint {@link URI}. + * Multiple {@code CosmosAsyncClient} instances configured with the same + * service endpoint share a single cache; the entry is refcounted and evicted + * when the last caller releases. + * + *

Scope of sharing. Two clients share only when their service + * endpoints compare equal via {@link URI#equals(Object)} (which is + * case-insensitive on host per RFC 3986). Clients configured with + * different endpoint URIs — including a regional endpoint + * versus the global endpoint of the same account — do not share. + * This avoids the brittle alternatives (e.g. canonicalising + * {@link com.azure.cosmos.implementation.DatabaseAccount#getId() account ids} + * returned from regional endpoints, which embed a service-side normalised + * region suffix and cannot be reliably reversed).

+ * + *

An unreleased caller is cleaned up by registering a one-shot action with + * {@link ReferenceManager#INSTANCE}; when the owner becomes phantom-reachable + * the action decrements the refcount.

+ * + *

Sharing can be disabled via system property + * {@code COSMOS.SHARED_PARTITION_KEY_RANGE_CACHE_ENABLED=false}.

+ */ +public final class SharedPartitionKeyRangeCacheRegistry { + private static final Logger logger = LoggerFactory.getLogger(SharedPartitionKeyRangeCacheRegistry.class); + + private static final SharedPartitionKeyRangeCacheRegistry INSTANCE = new SharedPartitionKeyRangeCacheRegistry(); + + private final ConcurrentHashMap entries = new ConcurrentHashMap<>(); + + private SharedPartitionKeyRangeCacheRegistry() { + } + + public static SharedPartitionKeyRangeCacheRegistry getInstance() { + return INSTANCE; + } + + /** + * Returns the shared cache for {@code endpoint} (creating it if needed) and + * bumps the refcount. Returns an isolated cache when {@code endpoint} is + * {@code null} or sharing is disabled. + * + *

If {@code owner} is non-null, a deferred cleanup action is registered + * so the refcount is decremented automatically if {@code owner} is GC'd + * without calling {@link #release(URI, AsyncCacheNonBlocking, ReleaseHandle)}.

+ */ + public AcquireResult acquire(URI endpoint, Object owner) { + if (endpoint == null || !Configs.isSharedPartitionKeyRangeCacheEnabled()) { + return new AcquireResult(new AsyncCacheNonBlocking<>(), null); + } + + Entry entry = entries.compute(endpoint, (key, existing) -> { + if (existing == null) { + Entry created = new Entry(); + created.refCount.set(1); + logger.debug("Created shared partition key range cache for endpoint [{}]", key); + return created; + } + existing.refCount.incrementAndGet(); + return existing; + }); + + ReleaseHandle handle = null; + if (owner != null) { + // The cleanup lambda MUST NOT capture `owner`, otherwise the owner can + // never become phantom-reachable and the cleanup will never run. + final URI capturedEndpoint = endpoint; + final AsyncCacheNonBlocking capturedCache = entry.cache; + final ReleaseHandle h = new ReleaseHandle(); + ReferenceManager.INSTANCE.register(owner, () -> { + if (h.fulfill()) { + logger.warn( + "Leaked RxPartitionKeyRangeCache for endpoint [{}] released by" + + " ReferenceManager; always close the CosmosClient to avoid this.", + capturedEndpoint); + release(capturedEndpoint, capturedCache); + } + }); + handle = h; + } + return new AcquireResult(entry.cache, handle); + } + + /** + * Prompt release path. Fulfils {@code handle} (so the deferred cleanup + * becomes a no-op) and decrements the refcount. If the handle was already + * fulfilled by the deferred cleanup, this call is a no-op. + */ + public void release(URI endpoint, + AsyncCacheNonBlocking cache, + ReleaseHandle handle) { + if (handle != null && !handle.fulfill()) { + return; + } + release(endpoint, cache); + } + + /** Refcount decrement; evicts the entry at zero. */ + public void release(URI endpoint, AsyncCacheNonBlocking cache) { + if (endpoint == null || cache == null) { + return; + } + + entries.compute(endpoint, (key, existing) -> { + if (existing == null || existing.cache != cache) { + return existing; + } + int remaining = existing.refCount.decrementAndGet(); + if (remaining <= 0) { + logger.debug("Evicting shared partition key range cache for endpoint [{}]", key); + return null; + } + return existing; + }); + } + + /** Test-only. */ + int registeredEndpointCount() { + return entries.size(); + } + + /** Test-only. */ + int referenceCount(URI endpoint) { + Entry entry = entries.get(endpoint); + return entry == null ? 0 : entry.refCount.get(); + } + + /** Result of {@link #acquire(URI, Object)}: the cache plus a release handle (null when isolated). */ + public static final class AcquireResult { + public final AsyncCacheNonBlocking cache; + public final ReleaseHandle releaseHandle; + + AcquireResult(AsyncCacheNonBlocking cache, + ReleaseHandle releaseHandle) { + this.cache = cache; + this.releaseHandle = releaseHandle; + } + } + + /** + * One-shot CAS flag shared between prompt-close and deferred cleanup; + * guarantees exactly one refcount decrement. + */ + public static final class ReleaseHandle { + private final AtomicBoolean fulfilled = new AtomicBoolean(false); + + boolean fulfill() { + return fulfilled.compareAndSet(false, true); + } + } + + private static final class Entry { + final AsyncCacheNonBlocking cache = new AsyncCacheNonBlocking<>(); + final AtomicInteger refCount = new AtomicInteger(0); + } +}