Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<scalaVersion>${scala.version}</scalaVersion>
<secondaryCacheDir>${project.build.directory}/zinc-cache</secondaryCacheDir>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.azure.cosmos.CosmosRegionSwitchHint;
import com.azure.cosmos.SessionRetryOptions;
import com.azure.cosmos.SessionRetryOptionsBuilder;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
Expand Down Expand Up @@ -41,8 +40,6 @@
import static org.assertj.core.api.Assertions.assertThat;

public class ClientConfigDiagnosticsTest {
private static final String vmInstanceMachineId = ClientTelemetry.getMachineId(null);

private final ObjectMapper objectMapper = new ObjectMapper();
private static final ImplementationBridgeHelpers.CosmosContainerIdentityHelper.CosmosContainerIdentityAccessor containerIdentityAccessor = ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
Expand Down Expand Up @@ -168,7 +165,7 @@ public void bareMinimum() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());

assertThat(objectNode.get("id").asInt()).isEqualTo(1);
assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
Expand Down Expand Up @@ -201,7 +198,7 @@ public void rntbd() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());

assertThat(objectNode.get("id").asInt()).isEqualTo(1);
assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, nrto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:true)");
Expand Down Expand Up @@ -238,7 +235,7 @@ public void gw() throws Exception {
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());

assertThat(objectNode.get("id").asInt()).isEqualTo(1);
assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
Expand Down Expand Up @@ -312,7 +309,7 @@ public void full(
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());

assertThat(objectNode.get("id").asInt()).isEqualTo(1);
assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [westus1,westus2])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
Expand Down Expand Up @@ -365,7 +362,7 @@ public void sessionRetryOptionsInDiagnostics(SessionRetryOptions sessionRetryOpt
ObjectNode objectNode = (ObjectNode) objectMapper.readTree(jsonWriter.toString());

assertThat(objectNode.get("id").asInt()).isEqualTo(1);
assertThat(objectNode.get("machineId").asText()).isEqualTo(Strings.isNullOrEmpty(vmInstanceMachineId) ? machineId : vmInstanceMachineId);
assertThat(objectNode.get("machineId").asText()).isEqualTo(diagnosticsClientConfig.getMachineId());
assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, readConsistencyStrategy: null, mm: false, prgns: [null])");
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,70 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}

@Test(groups = "unit")
public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));

ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}

@Test(groups = "unit")
public void getOrCreateThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doThrow(new IllegalStateException("test exception"))
.when(containerMock)
.enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));

ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,68 @@ public void getThroughputControlConfigForFeedRange() {
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
}

@Test(groups = "unit")
public void getThroughputControlConfigForFeedRange_withThroughputBucket() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doNothing().when(containerMock).enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}

@Test(groups = "unit")
public void getThroughputControlConfigForFeedRange_withThroughputBucket_enableServerGroupFails() {

ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("test-" + UUID.randomUUID())
.priorityLevel(PriorityLevel.LOW)
.throughputBucket(2)
.build();

ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
Mockito.doThrow(new IllegalStateException("test exception"))
.when(containerMock)
.enableServerThroughputControlGroup(Mockito.any());

FeedRangeThroughputControlConfigManager throughputControlConfigManager =
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig, documentClientMock);

FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
ThroughputControlGroupConfig pkRangeThroughputControlConfig =
throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);

assertThat(pkRangeThroughputControlConfig).isNotNull();
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
assertThat(pkRangeThroughputControlConfig.getThroughputBucket()).isEqualTo(2);

Mockito.verify(containerMock, Mockito.times(1)).enableServerThroughputControlGroup(throughputControlGroupConfig);
Mockito.verify(containerMock, Mockito.never()).enableLocalThroughputControlGroup(Mockito.any());
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.82.0-beta.1 (Unreleased)

#### Features Added
* Added throughput bucket support for Change Feed Processor feed polling throughput control. - See [Issue 49487](https://github.com/Azure/azure-sdk-for-java/issues/49487)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -34,6 +35,7 @@ public class FeedRangeThroughputControlConfigManager {
private final AtomicReference<List<FeedRangeEpkImpl>> leaseTokens; // epk leases
private final Map<PartitionKeyRange, List<FeedRange>> pkRangeToFeedRangeMap;
private final Map<FeedRange, ThroughputControlGroupConfig> feedRangeToThroughputControlGroupConfigMap;
private final AtomicBoolean throughputControlGroupEnabled;

public FeedRangeThroughputControlConfigManager(
ThroughputControlGroupConfig throughputControlGroupConfig,
Expand All @@ -47,6 +49,7 @@ public FeedRangeThroughputControlConfigManager(
this.leaseTokens = new AtomicReference<>();
this.pkRangeToFeedRangeMap = new ConcurrentHashMap<>();
this.feedRangeToThroughputControlGroupConfigMap = new ConcurrentHashMap<>();
this.throughputControlGroupEnabled = new AtomicBoolean(false);
}

/**
Expand Down Expand Up @@ -88,6 +91,23 @@ public Mono<Void> refresh(List<Lease> leases) {

public Mono<ThroughputControlGroupConfig> getOrCreateThroughputControlConfigForFeedRange(FeedRangeEpkImpl feedRange) {
checkNotNull(feedRange, "Argument 'feedRange' can not be null");

if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
return Mono.defer(() -> {
if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
try {
this.documentClient.getContainerClient()
.enableServerThroughputControlGroup(this.throughputControlGroupConfig);
} catch (Exception ex) {
logger.warn(
"Enable server throughput control group failed, continuing without throughput control.",
ex);
}
}
return Mono.just(this.throughputControlGroupConfig);
});
}

ThroughputControlGroupConfig throughputControlGroupConfigForFeedRange =
this.feedRangeToThroughputControlGroupConfigMap.get(feedRange);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.azure.cosmos.ThroughputControlGroupConfig;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.models.FeedRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -14,6 +16,7 @@
// Only used in CFP when customer configure throughput control config
// The main purpose of this class is to create corresponding throughput control group for each feed range
public class FeedRangeThroughputControlConfigManager {
private static final Logger logger = LoggerFactory.getLogger(FeedRangeThroughputControlConfigManager.class);

private final ThroughputControlGroupConfig throughputControlGroupConfig;
private final ChangeFeedContextClient documentClient;
Expand All @@ -36,12 +39,24 @@ public ThroughputControlGroupConfig getThroughputControlConfigForFeedRange(FeedR

// for pkRange leases, it has only been used to support for split
// the lease feed range and partition key range is always a 1:1 mapping
// throughput control internally will divide the target RU across all pk ranges
// so all pk ranges can use the same local throughput control group
// for local throughput control, throughput control internally will divide the target RU across all pk ranges
// so all pk ranges can use the same local throughput control group. Throughput bucket uses server
// throughput control and can also share the same group across all pk ranges.
// Note: if global throughput control be added in future, then we will need to create one group per pkRange

if (this.throughputControlGroupEnabled.compareAndSet(false, true)) {
this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
if (this.throughputControlGroupConfig.getThroughputBucket() != null) {
try {
this.documentClient.getContainerClient()
.enableServerThroughputControlGroup(this.throughputControlGroupConfig);
} catch (Exception ex) {
logger.warn(
"Enable server throughput control group failed, continuing without throughput control.",
ex);
}
} else {
Comment thread
arnabnandy7 marked this conversation as resolved.
this.documentClient.getContainerClient().enableLocalThroughputControlGroup(this.throughputControlGroupConfig);
}
}

return this.throughputControlGroupConfig;
Expand Down