From c1954035f2f73d95ece8467625bfc331817a30b2 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 01:34:02 +0530 Subject: [PATCH 1/5] Add throughput bucket support for Change Feed Processor feed polling --- ...geThroughputControlConfigManagerTests.java | 32 +++++++++++++++++++ ...geThroughputControlConfigManagerTests.java | 31 ++++++++++++++++++ sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + ...edRangeThroughputControlConfigManager.java | 11 +++++++ ...edRangeThroughputControlConfigManager.java | 6 +++- 5 files changed, 80 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java index 566ff7043336..1cc753e834a6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -66,4 +66,36 @@ public void getThroughputControlConfigForFeedRange() { assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size()); assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); } + + @Test(groups = "unit") + public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false)); + + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block(); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java index 115d060e0d06..4331a7f074c7 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -48,4 +48,35 @@ public void getThroughputControlConfigForFeedRange() { assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()); assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); } + + @Test(groups = "unit") + public void getThroughputControlConfigForFeedRange_withThroughputBucket() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1"); + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index eee4f87437f9..d7ed7c883c6b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.82.0-beta.1 (Unreleased) #### Features Added +* Added throughput bucket support for Change Feed Processor feed polling throughput control. - See [Issue 49487](https://github.com/Azure/azure-sdk-for-java/issues/49487) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java index 636f99ba6581..8725e7a5516c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ public class FeedRangeThroughputControlConfigManager { private final AtomicReference> leaseTokens; // epk leases private final Map> pkRangeToFeedRangeMap; private final Map feedRangeToThroughputControlGroupConfigMap; + private final AtomicBoolean throughputControlGroupEnabled; public FeedRangeThroughputControlConfigManager( ThroughputControlGroupConfig throughputControlGroupConfig, @@ -47,6 +49,7 @@ public FeedRangeThroughputControlConfigManager( this.leaseTokens = new AtomicReference<>(); this.pkRangeToFeedRangeMap = new ConcurrentHashMap<>(); this.feedRangeToThroughputControlGroupConfigMap = new ConcurrentHashMap<>(); + this.throughputControlGroupEnabled = new AtomicBoolean(false); } /** @@ -88,6 +91,14 @@ public Mono refresh(List leases) { public Mono getOrCreateThroughputControlConfigForFeedRange(FeedRangeEpkImpl feedRange) { checkNotNull(feedRange, "Argument 'feedRange' can not be null"); + + if (this.throughputControlGroupConfig.getThroughputBucket() != null) { + if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { + this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } + return Mono.just(this.throughputControlGroupConfig); + } + ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange = this.feedRangeToThroughputControlGroupConfigMap.get(feedRange); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java index f0ebbe1db4af..1d3120c39f6e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java @@ -41,7 +41,11 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR // Note: if global throughput control be added in future, then we will need to create one group per pkRange if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { - this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig); + if (this.throughputControlGroupConfig.getThroughputBucket() != null) { + this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } else { + this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig); + } } return this.throughputControlGroupConfig; From 07648fca32a43a6a58a324ebc9879539ce843b86 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 22:25:15 +0530 Subject: [PATCH 2/5] Fix Cosmos diagnostics machine ID test expectations --- .../implementation/ClientConfigDiagnosticsTest.java | 13 +++++-------- .../FeedRangeThroughputControlConfigManager.java | 5 +++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java index f1ea462ca638..b7a6908d4fe8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java @@ -11,7 +11,6 @@ import com.azure.cosmos.CosmosRegionSwitchHint; import com.azure.cosmos.SessionRetryOptions; import com.azure.cosmos.SessionRetryOptionsBuilder; -import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; @@ -41,8 +40,6 @@ import static org.assertj.core.api.Assertions.assertThat; public class ClientConfigDiagnosticsTest { - private static final String vmInstanceMachineId = ClientTelemetry.getMachineId(null); - private final ObjectMapper objectMapper = new ObjectMapper(); private static final ImplementationBridgeHelpers.CosmosContainerIdentityHelper.CosmosContainerIdentityAccessor containerIdentityAccessor = ImplementationBridgeHelpers .CosmosContainerIdentityHelper @@ -168,7 +165,7 @@ public void bareMinimum() throws Exception { ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString()); assertThat(objectNode.get("id").asInt()).isEqualTo(1); - assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId); + assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId()); assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); @@ -201,7 +198,7 @@ public void rntbd() throws Exception { ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString()); assertThat(objectNode.get("id").asInt()).isEqualTo(1); - assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId); + assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId()); assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, nrto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:true)"); @@ -238,7 +235,7 @@ public void gw() throws Exception { ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString()); assertThat(objectNode.get("id").asInt()).isEqualTo(1); - assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId); + assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId()); assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); @@ -312,7 +309,7 @@ public void full( ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString()); assertThat(objectNode.get("id").asInt()).isEqualTo(1); - assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId); + assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId()); assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [westus1,westus2])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); @@ -365,7 +362,7 @@ public void sessionRetryOptionsInDiagnostics(SessionRetryOptions sessionRetryOpt ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString()); assertThat(objectNode.get("id").asInt()).isEqualTo(1); - assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId); + assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId()); assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2); assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])"); assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java index 1d3120c39f6e..2f66b04f4875 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java @@ -36,8 +36,9 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR // for pkRange leases, it has only been used to support for split // the lease feed range and partition key range is always a 1:1 mapping - // throughput control internally will divide the target RU across all pk ranges - // so all pk ranges can use the same local throughput control group + // for local throughput control, throughput control internally will divide the target RU across all pk ranges + // so all pk ranges can use the same local throughput control group. Throughput bucket uses server + // throughput control and can also share the same group across all pk ranges. // Note: if global throughput control be added in future, then we will need to create one group per pkRange if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { From 6f2a21832225902f411dc2333cfc80a32ab28373 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 22:38:26 +0530 Subject: [PATCH 3/5] Defer CFP throughput bucket server group initialization --- ...geThroughputControlConfigManagerTests.java | 34 +++++++++++++++++++ ...edRangeThroughputControlConfigManager.java | 17 +++++++--- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java index 1cc753e834a6..f06e9dbbb7bd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -98,4 +98,38 @@ public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket( Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); } + + @Test(groups = "unit") + public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doThrow(new IllegalStateException("test exception")) + .when(containerMock) + .enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false)); + + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block(); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java index 8725e7a5516c..838dbb455618 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java @@ -93,10 +93,19 @@ public Mono getOrCreateThroughputControlConfigForF checkNotNull(feedRange, "Argument 'feedRange' can not be null"); if (this.throughputControlGroupConfig.getThroughputBucket() != null) { - if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { - this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); - } - return Mono.just(this.throughputControlGroupConfig); + return Mono.defer(() -> { + if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { + try { + this.documentClient.getContainerClient() + .enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } catch (Exception ex) { + logger.warn( + "Enable server throughput control group failed, continuing without throughput control.", + ex); + } + } + return Mono.just(this.throughputControlGroupConfig); + }); } ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange = From d0a77d324dd99378a7a308155a597154128ad9a8 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Mon, 22 Jun 2026 22:45:51 +0530 Subject: [PATCH 4/5] Addressed the PK review comment too. --- ...geThroughputControlConfigManagerTests.java | 33 +++++++++++++++++++ ...edRangeThroughputControlConfigManager.java | 12 ++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java index 4331a7f074c7..d4fafa5cceb1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java @@ -79,4 +79,37 @@ public void getThroughputControlConfigForFeedRange_withThroughputBucket() { Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); } + + @Test(groups = "unit") + public void getThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() { + + ThroughputControlGroupConfig throughputControlGroupConfig = + new ThroughputControlGroupConfigBuilder() + .groupName("test-" + UUID.randomUUID()) + .priorityLevel(PriorityLevel.LOW) + .throughputBucket(2) + .build(); + + ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class); + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient(); + Mockito.doThrow(new IllegalStateException("test exception")) + .when(containerMock) + .enableServerThroughputControlGroup(Mockito.any()); + + FeedRangeThroughputControlConfigManager throughputControlConfigManager = + new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock); + + FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1"); + ThroughputControlGroupConfig pkRangeThroughputControlConfig = + throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange); + + assertThat(pkRangeThroughputControlConfig).isNotNull(); + assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName()); + assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel()); + assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2); + + Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig); + Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java index 2f66b04f4875..923179477de3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java @@ -6,6 +6,8 @@ import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; import com.azure.cosmos.models.FeedRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,6 +16,7 @@ // Only used in CFP when customer configure throughput control config // The main purpose of this class is to create corresponding throughput control group for each feed range public class FeedRangeThroughputControlConfigManager { + private static final Logger logger = LoggerFactory.getLogger(FeedRangeThroughputControlConfigManager.class); private final ThroughputControlGroupConfig throughputControlGroupConfig; private final ChangeFeedContextClient documentClient; @@ -43,7 +46,14 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR if (this.throughputControlGroupEnabled.compareAndSet(false, true)) { if (this.throughputControlGroupConfig.getThroughputBucket() != null) { - this.documentClient.getContainerClient().enableServerThroughputControlGroup(this.throughputControlGroupConfig); + try { + this.documentClient.getContainerClient() + .enableServerThroughputControlGroup(this.throughputControlGroupConfig); + } catch (Exception ex) { + logger.warn( + "Enable server throughput control group failed, continuing without throughput control.", + ex); + } } else { this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig); } From cb1550e861273cd9e07838ce17db40923923cfb8 Mon Sep 17 00:00:00 2001 From: Arnab Nandy Date: Tue, 23 Jun 2026 00:36:49 +0530 Subject: [PATCH 5/5] Isolate Spark Zinc compiler cache per module --- sdk/cosmos/azure-cosmos-spark_3/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos-spark_3/pom.xml b/sdk/cosmos/azure-cosmos-spark_3/pom.xml index 2405150dc171..0adafc3c90fd 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/pom.xml +++ b/sdk/cosmos/azure-cosmos-spark_3/pom.xml @@ -429,6 +429,7 @@ ${maven.compiler.source} ${maven.compiler.target} ${scala.version} + ${project.build.directory}/zinc-cache