diff --git a/sdk/cosmos/azure-cosmos-spark_3/pom.xml b/sdk/cosmos/azure-cosmos-spark_3/pom.xml
index 2405150dc171..0adafc3c90fd 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/pom.xml
+++ b/sdk/cosmos/azure-cosmos-spark_3/pom.xml
@@ -429,6 +429,7 @@
${maven.compiler.source}
${maven.compiler.target}
${scala.version}
+ ${project.build.directory}/zinc-cache
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java
index f1ea462ca638..b7a6908d4fe8 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientConfigDiagnosticsTest.java
@@ -11,7 +11,6 @@
import com.azure.cosmos.CosmosRegionSwitchHint;
import com.azure.cosmos.SessionRetryOptions;
import com.azure.cosmos.SessionRetryOptionsBuilder;
-import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
@@ -41,8 +40,6 @@
import static org.assertj.core.api.Assertions.assertThat;
public class ClientConfigDiagnosticsTest {
- private static final String vmInstanceMachineId = ClientTelemetry.getMachineId(null);
-
private final ObjectMapper objectMapper = new ObjectMapper();
private static final ImplementationBridgeHelpers.CosmosContainerIdentityHelper.CosmosContainerIdentityAccessor containerIdentityAccessor = ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
@@ -168,7 +165,7 @@ public void bareMinimum() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());
assertThat(objectNode.get("id").asInt()).isEqualTo(1);
- assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
+ assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
@@ -201,7 +198,7 @@ public void rntbd() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());
assertThat(objectNode.get("id").asInt()).isEqualTo(1);
- assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
+ assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, nrto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:true)");
@@ -238,7 +235,7 @@ public void gw() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());
assertThat(objectNode.get("id").asInt()).isEqualTo(1);
- assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
+ assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
@@ -312,7 +309,7 @@ public void full(
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());
assertThat(objectNode.get("id").asInt()).isEqualTo(1);
- assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
+ assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [westus1,westus2])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
@@ -365,7 +362,7 @@ public void sessionRetryOptionsInDiagnostics(SessionRetryOptions sessionRetryOpt
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());
assertThat(objectNode.get("id").asInt()).isEqualTo(1);
- assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
+ assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java
index 566ff7043336..f06e9dbbb7bd 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManagerTests.java
@@ -66,4 +66,70 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}
+
+ @Test(groups = "unit")
+ public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket() {
+
+ ThroughputControlGroupConfig throughputControlGroupConfig =
+ new ThroughputControlGroupConfigBuilder()
+ .groupName("test-" + UUID.randomUUID())
+ .priorityLevel(PriorityLevel.LOW)
+ .throughputBucket(2)
+ .build();
+
+ ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
+ CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
+ Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
+ Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());
+
+ FeedRangeThroughputControlConfigManager throughputControlConfigManager =
+ new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);
+
+ FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));
+
+ ThroughputControlGroupConfig pkRangeThroughputControlConfig =
+ throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();
+
+ assertThat(pkRangeThroughputControlConfig).isNotNull();
+ assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
+ assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
+ assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);
+
+ Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
+ Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
+ }
+
+ @Test(groups = "unit")
+ public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() {
+
+ ThroughputControlGroupConfig throughputControlGroupConfig =
+ new ThroughputControlGroupConfigBuilder()
+ .groupName("test-" + UUID.randomUUID())
+ .priorityLevel(PriorityLevel.LOW)
+ .throughputBucket(2)
+ .build();
+
+ ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
+ CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
+ Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
+ Mockito.doThrow(new IllegalStateException("test exception"))
+ .when(containerMock)
+ .enableServerThroughputControlGroup(Mockito.any());
+
+ FeedRangeThroughputControlConfigManager throughputControlConfigManager =
+ new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);
+
+ FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));
+
+ ThroughputControlGroupConfig pkRangeThroughputControlConfig =
+ throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();
+
+ assertThat(pkRangeThroughputControlConfig).isNotNull();
+ assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
+ assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
+ assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);
+
+ Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
+ Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
+ }
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java
index 115d060e0d06..d4fafa5cceb1 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManagerTests.java
@@ -48,4 +48,68 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}
+
+ @Test(groups = "unit")
+ public void getThroughputControlConfigForFeedRange_withThroughputBucket() {
+
+ ThroughputControlGroupConfig throughputControlGroupConfig =
+ new ThroughputControlGroupConfigBuilder()
+ .groupName("test-" + UUID.randomUUID())
+ .priorityLevel(PriorityLevel.LOW)
+ .throughputBucket(2)
+ .build();
+
+ ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
+ CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
+ Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
+ Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());
+
+ FeedRangeThroughputControlConfigManager throughputControlConfigManager =
+ new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);
+
+ FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
+ ThroughputControlGroupConfig pkRangeThroughputControlConfig =
+ throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);
+
+ assertThat(pkRangeThroughputControlConfig).isNotNull();
+ assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
+ assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
+ assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);
+
+ Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
+ Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
+ }
+
+ @Test(groups = "unit")
+ public void getThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() {
+
+ ThroughputControlGroupConfig throughputControlGroupConfig =
+ new ThroughputControlGroupConfigBuilder()
+ .groupName("test-" + UUID.randomUUID())
+ .priorityLevel(PriorityLevel.LOW)
+ .throughputBucket(2)
+ .build();
+
+ ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
+ CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
+ Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
+ Mockito.doThrow(new IllegalStateException("test exception"))
+ .when(containerMock)
+ .enableServerThroughputControlGroup(Mockito.any());
+
+ FeedRangeThroughputControlConfigManager throughputControlConfigManager =
+ new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);
+
+ FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
+ ThroughputControlGroupConfig pkRangeThroughputControlConfig =
+ throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);
+
+ assertThat(pkRangeThroughputControlConfig).isNotNull();
+ assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
+ assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
+ assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);
+
+ Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
+ Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
+ }
}
diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md
index eee4f87437f9..d7ed7c883c6b 100644
--- a/sdk/cosmos/azure-cosmos/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md
@@ -3,6 +3,7 @@
### 4.82.0-beta.1 (Unreleased)
#### Features Added
+* Added throughput bucket support for Change Feed Processor feed polling throughput control. - See [Issue 49487](https://github.com/Azure/azure-sdk-for-java/issues/49487)
#### Breaking Changes
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java
index 636f99ba6581..838dbb455618 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FeedRangeThroughputControlConfigManager.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -34,6 +35,7 @@ public class FeedRangeThroughputControlConfigManager {
private final AtomicReference> leaseTokens; // epk leases
private final Map> pkRangeToFeedRangeMap;
private final Map feedRangeToThroughputControlGroupConfigMap;
+ private final AtomicBoolean throughputControlGroupEnabled;
public FeedRangeThroughputControlConfigManager(
ThroughputControlGroupConfig throughputControlGroupConfig,
@@ -47,6 +49,7 @@ public FeedRangeThroughputControlConfigManager(
this.leaseTokens = new AtomicReference<>();
this.pkRangeToFeedRangeMap = new ConcurrentHashMap<>();
this.feedRangeToThroughputControlGroupConfigMap = new ConcurrentHashMap<>();
+ this.throughputControlGroupEnabled = new AtomicBoolean(false);
}
/**
@@ -88,6 +91,23 @@ public Mono refresh(List leases) {
public Mono getOrCreateThroughputControlConfigForFeedRange(FeedRangeEpkImpl feedRange) {
checkNotNull(feedRange, "Argument 'feedRange' can not be null");
+
+ if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
+ return Mono.defer(() -> {
+ if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
+ try {
+ this.documentClient.getContainerClient()
+ .enableServerThroughputControlGroup(this.throughputControlGroupConfig);
+ } catch (Exception ex) {
+ logger.warn(
+ "Enable server throughput control group failed, continuing without throughput control.",
+ ex);
+ }
+ }
+ return Mono.just(this.throughputControlGroupConfig);
+ });
+ }
+
ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange =
this.feedRangeToThroughputControlGroupConfigMap.get(feedRange);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java
index f0ebbe1db4af..923179477de3 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/FeedRangeThroughputControlConfigManager.java
@@ -6,6 +6,8 @@
import com.azure.cosmos.ThroughputControlGroupConfig;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.models.FeedRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -14,6 +16,7 @@
// Only used in CFP when customer configure throughput control config
// The main purpose of this class is to create corresponding throughput control group for each feed range
public class FeedRangeThroughputControlConfigManager {
+ private static final Logger logger = LoggerFactory.getLogger(FeedRangeThroughputControlConfigManager.class);
private final ThroughputControlGroupConfig throughputControlGroupConfig;
private final ChangeFeedContextClient documentClient;
@@ -36,12 +39,24 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR
// for pkRange leases, it has only been used to support for split
// the lease feed range and partition key range is always a 1:1 mapping
- // throughput control internally will divide the target RU across all pk ranges
- // so all pk ranges can use the same local throughput control group
+ // for local throughput control, throughput control internally will divide the target RU across all pk ranges
+ // so all pk ranges can use the same local throughput control group. Throughput bucket uses server
+ // throughput control and can also share the same group across all pk ranges.
// Note: if global throughput control be added in future, then we will need to create one group per pkRange
if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
- this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
+ if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
+ try {
+ this.documentClient.getContainerClient()
+ .enableServerThroughputControlGroup(this.throughputControlGroupConfig);
+ } catch (Exception ex) {
+ logger.warn(
+ "Enable server throughput control group failed, continuing without throughput control.",
+ ex);
+ }
+ } else {
+ this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
+ }
}
return this.throughputControlGroupConfig;