diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java index acefe02df455..08c95bfe3ff2 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java @@ -220,11 +220,11 @@ private void runLifecycleLoop(BenchmarkConfig config) throws Exception { runWorkload(benchmarks, cycle, executor); logger.info("[LIFECYCLE] POST_WORKLOAD cycle={} timestamp={}", cycle, Instant.now()); - // 3. Close all clients + // 5. Close all clients shutdownBenchmarks(benchmarks, cycle); logger.info("[LIFECYCLE] POST_CLOSE cycle={} timestamp={}", cycle, Instant.now()); - // 4. Settle + // 6. Settle if (config.getSettleTimeMs() > 0) { logger.info(" Settling for {}ms...", config.getSettleTimeMs()); long halfSettle = config.getSettleTimeMs() / 2; diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java index 24b7b04cfe2b..545bc515a9fc 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java @@ -185,11 +185,13 @@ protected void init() { CompletableFuture futureResult = CompletableFuture.supplyAsync(() -> { int maxRetries = 5; + Exception lastException = null; for (int attempt = 0; attempt <= maxRetries; attempt++) { try { CosmosItemResponse itemResponse = cosmosContainer.createItem(newDoc); return toPojoizedJson(itemResponse); } catch (CosmosException ce) { + lastException = ce; if (ce.getStatusCode() == 409) { // conflict — document already exists, read it back try { @@ -214,10 +216,11 @@ protected void init() { } throw propagate(ce); } catch (Exception e) { + lastException = e; throw propagate(e); } } - throw new RuntimeException("Exhausted retries for createItem"); + throw new RuntimeException("Exhausted retries for createItem", lastException); }, executorService); diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java index 03f43408ff26..20d3838d3375 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java @@ -335,6 +335,12 @@ public ConnectionMode getConnectionMode() { public ConsistencyLevel getConsistencyLevel() { if (consistencyLevel == null) return ConsistencyLevel.SESSION; + for (ConsistencyLevel level : ConsistencyLevel.values()) { + if (level.toString().equalsIgnoreCase(consistencyLevel) + || level.name().equalsIgnoreCase(consistencyLevel)) { + return level; + } + } return ConsistencyLevel.valueOf(consistencyLevel.toUpperCase()); } diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java index d13c5ee778da..3de648c28374 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java @@ -280,7 +280,7 @@ private void createPrePopulatedDocs(int numberOfPreCreatedDocuments) { } else { failureCount.incrementAndGet(); failedResponses.add(response); - logger.debug("Error during pre populating item {}", + logger.debug("Error during pre-populating: {}", response.getException() != null ? response.getException().getMessage() : "unknown error"); } }) diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/WorkflowTest.java b/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/WorkflowTest.java index 374f67ef1c09..7a824b84d258 100644 --- a/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/WorkflowTest.java +++ b/sdk/cosmos/azure-cosmos-benchmark/src/test/java/com/azure/cosmos/benchmark/WorkflowTest.java @@ -11,13 +11,14 @@ import com.azure.cosmos.models.IncludedPath; import com.azure.cosmos.models.IndexingPolicy; import com.azure.cosmos.models.PartitionKeyDefinition; -import org.apache.commons.lang3.StringUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import reactor.core.scheduler.Schedulers; +import java.io.File; +import java.io.FileWriter; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -32,17 +33,22 @@ public class WorkflowTest { @Test(groups = "fast", timeOut = TIMEOUT) public void readMyWritesCLI() throws Exception { - String cmdFormat = "-serviceEndpoint %s -masterKey %s" + - " -databaseId %s -collectionId %s" + - " -consistencyLevel SESSION -concurrency 2 -numberOfOperations 123" + - " -operation ReadMyWrites -connectionMode DIRECT -numberOfPreCreatedDocuments 100"; - - String cmd = String.format(cmdFormat, - TestConfigurations.HOST, - TestConfigurations.MASTER_KEY, - database.getId(), - collection.getId()); - Main.main(StringUtils.split(cmd)); + File configFile = createWorkloadConfigFile( + TestConfigurations.HOST, + TestConfigurations.MASTER_KEY, + database.getId(), + collection.getId(), + "ReadMyWrites", + "DIRECT", + "SESSION", + 2, + 123, + 100); + try { + Main.main(new String[]{"-workloadConfig", configFile.getAbsolutePath()}); + } finally { + configFile.delete(); + } } @Test(dataProvider = "collectionLinkTypeArgProvider", groups = "fast", timeOut = TIMEOUT) @@ -85,17 +91,22 @@ protected void onSuccess() { @Test(groups = "fast", timeOut = TIMEOUT) public void writeThroughputCLI() throws Exception { - String cmdFormat = "-serviceEndpoint %s -masterKey %s" + - " -databaseId %s -collectionId %s" + - " -consistencyLevel SESSION -concurrency 2 -numberOfOperations 1000" + - " -operation WriteThroughput -connectionMode DIRECT"; - - String cmd = String.format(cmdFormat, - TestConfigurations.HOST, - TestConfigurations.MASTER_KEY, - database.getId(), - collection.getId()); - Main.main(StringUtils.split(cmd)); + File configFile = createWorkloadConfigFile( + TestConfigurations.HOST, + TestConfigurations.MASTER_KEY, + database.getId(), + collection.getId(), + "WriteThroughput", + "DIRECT", + "SESSION", + 2, + 1000, + 0); + try { + Main.main(new String[]{"-workloadConfig", configFile.getAbsolutePath()}); + } finally { + configFile.delete(); + } } @Test(dataProvider = "collectionLinkTypeArgProvider", groups = "fast", timeOut = TIMEOUT) @@ -292,6 +303,42 @@ public void afterClass() { Utils.safeClose(housekeepingClient); } + private File createWorkloadConfigFile( + String serviceEndpoint, + String masterKey, + String databaseId, + String containerId, + String operation, + String connectionMode, + String consistencyLevel, + int concurrency, + int numberOfOperations, + int numberOfPreCreatedDocuments) throws Exception { + + String json = String.format( + "{ \"tenants\": [{ " + + "\"serviceEndpoint\": \"%s\", " + + "\"masterKey\": \"%s\", " + + "\"databaseId\": \"%s\", " + + "\"containerId\": \"%s\", " + + "\"operation\": \"%s\", " + + "\"connectionMode\": \"%s\", " + + "\"consistencyLevel\": \"%s\", " + + "\"concurrency\": %d, " + + "\"numberOfOperations\": %d, " + + "\"numberOfPreCreatedDocuments\": %d " + + "}] }", + serviceEndpoint, masterKey, databaseId, containerId, + operation, connectionMode, consistencyLevel, + concurrency, numberOfOperations, numberOfPreCreatedDocuments); + + File tempFile = File.createTempFile("workload-config-", ".json"); + try (FileWriter writer = new FileWriter(tempFile)) { + writer.write(json); + } + return tempFile; + } + DocumentCollection getCollectionDefinitionWithRangeRangeIndex() { PartitionKeyDefinition partitionKeyDef = new PartitionKeyDefinition(); ArrayList paths = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ResponseHeaderCleanerHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ResponseHeaderCleanerHandler.java index ce3204954783..d13192c7caed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ResponseHeaderCleanerHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2ResponseHeaderCleanerHandler.java @@ -4,13 +4,13 @@ package com.azure.cosmos.implementation.http; import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2HeadersFrame; import io.netty.handler.codec.http2.Http2SettingsAckFrame; import io.netty.handler.codec.http2.Http2SettingsFrame; +import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,29 +18,21 @@ public class Http2ResponseHeaderCleanerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(Http2ResponseHeaderCleanerHandler.class); + private static final AsciiString SERVER_VERSION_KEY = AsciiString.of(HttpConstants.HttpHeaders.SERVER_VERSION); + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Http2HeadersFrame) { Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg; Http2Headers headers = headersFrame.headers(); - headers.forEach(entry -> { - CharSequence key = entry.getKey(); - CharSequence value = entry.getValue(); - - // Based on the tests, only 'x-ms-serviceversion' header has extra value, - // so only check this specific header here - if (StringUtils.equalsIgnoreCase(key, HttpConstants.HttpHeaders.SERVER_VERSION)) { - // Check for leading whitespace or other prohibited characters - if (StringUtils.isNotEmpty(value) && (value.charAt(0) == ' ' || value.charAt(value.length() - 1) == ' ')) { - // Clean up the header value by trimming or handling as needed - logger.trace("There are extra white space for key {} with value {}", key, value); - - // TODO[Http2]: for now just trim the spaces, explore other options for example escape the whitespace - headers.set(key, value.toString().trim()); - } - } - }); + // Direct O(1) hash lookup instead of O(n) forEach iteration over all headers + CharSequence serverVersion = headers.get(SERVER_VERSION_KEY); + if (serverVersion != null && serverVersion.length() > 0 + && (serverVersion.charAt(0) == ' ' || serverVersion.charAt(serverVersion.length() - 1) == ' ')) { + logger.trace("There is extra whitespace for key {} with value {}", SERVER_VERSION_KEY, serverVersion); + headers.set(SERVER_VERSION_KEY, serverVersion.toString().trim()); + } super.channelRead(ctx, msg); } else if (msg instanceof Http2SettingsAckFrame) {