From 5b1b321a6776babff42c549f31f23c858214efbc Mon Sep 17 00:00:00 2001 From: Annie Liang Date: Thu, 22 Jan 2026 09:48:00 -0800 Subject: [PATCH] remove transactional batch from public api --- .../docs/configuration-reference.md | 1 - .../docs/scenarios/Ingestion.md | 69 -- .../com/azure/cosmos/spark/CosmosConfig.scala | 12 +- .../spark/TransactionalBatchITest.scala | 1000 ----------------- 4 files changed, 1 insertion(+), 1081 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala diff --git a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md index 5d9907342ba7..3c65b3fdae5c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md @@ -67,7 +67,6 @@ | `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | | `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` | | `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled | -| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. **Note**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). | | `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. | | `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. | | `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. | diff --git a/sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md b/sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md index b7a206aec6d6..0825a513458f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md +++ b/sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md @@ -79,75 +79,6 @@ When your container has a "unique key constraint policy" any 409 "Conflict" (ind - For `ItemOverwrite` a 409 - Conflict due to unique key violation will result in an error - and the Spark job will fail. *NOTE: Conflicts due to pk+id being identical to another document won't even result in a 409 - because with Upsert the existing document would simply be updated.* - For `ItemAppend` like conflicts on pk+id any unique key policy constraint violation will be ignored. -### Transactional batch writes - -For scenarios requiring atomic all-or-nothing semantics within a partition, you can enable transactional batch writes using the `spark.cosmos.write.bulk.transactional` configuration. When enabled, all operations for a single partition key value either succeed or fail together. - -#### Configuration - -**Python example:** -```python -df.write \ - .format("cosmos.oltp") \ - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) \ - .option("spark.cosmos.accountKey", cosmosKey) \ - .option("spark.cosmos.database", cosmosDatabase) \ - .option("spark.cosmos.container", cosmosContainer) \ - .option("spark.cosmos.write.bulk.enabled", "true") \ - .option("spark.cosmos.write.bulk.transactional", "true") \ - .mode("Append") \ - .save() -``` - -**Scala example:** -```scala -df.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainer) - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.bulk.transactional", "true") - .mode(SaveMode.Append) - .save() -``` - -#### Characteristics and limitations - -- **Atomic semantics**: All operations for the same partition key succeed or all fail (rollback) -- **Operation type**: Only upsert operations are supported (equivalent to `ItemOverwrite` write strategy) -- **Partition grouping**: Spark automatically partitions and orders data by partition key columns -- **Size limits**: Maximum 100 operations per transaction; maximum 2MB total payload per transaction. These limits are **enforced service-side by Azure Cosmos DB** and cannot be configured client-side. -- **Partition key requirement**: All operations in a transaction must share the same partition key value -- **Hierarchical partition keys**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys -- **Bulk mode required**: Must have `spark.cosmos.write.bulk.enabled=true` (enabled by default) - -#### Use cases - -Transactional batch writes are ideal for: -- Financial transactions requiring consistency across multiple documents (e.g., double-entry bookkeeping, account balance updates) -- Order processing where order header and line items must be committed together -- Multi-document updates that must be atomic (e.g., inventory adjustments) -- **Temporal data scenarios** where historical versioning must be consistent (e.g., financial instrument versioning, audit trails, snapshot consistency) -- Any scenario where partial success would leave data in an inconsistent state - -#### Error handling - -Errors during transactional batch execution fall into two categories: - -**Transient errors** (automatically retried): -- Insufficient RUs (429 throttling) - the entire transaction is retried according to the SDK's retry policy -- Temporary network issues or service unavailability - automatic retry with exponential backoff - -**Non-transient errors** (Spark job fails): -- Transaction size limit exceeded (> 100 operations or > 2MB payload) -- Document validation errors (e.g., blank ID, schema violations) -- Unique key constraint violations -- Any service-side rejection that cannot be resolved by retry - -When any operation in a transaction fails with a non-transient error, the entire transaction is rolled back and no documents are modified. The Spark task will fail and may retry according to Spark's retry policy. - ## Preparation Below are a couple of tips/best-practices that can help you to prepare for a data migration into a Cosmos DB container. diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index f28e459b2d83..9a50d6ced223 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -113,7 +113,6 @@ private[spark] object CosmosConfigNames { val ClientTelemetryEnabled = "spark.cosmos.clientTelemetry.enabled" // keep this to avoid breaking changes val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint" // keep this to avoid breaking changes val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled" - val WriteBulkTransactional = "spark.cosmos.write.bulk.transactional" val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations" val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize" val WriteBulkMinTargetBatchSize = "spark.cosmos.write.bulk.minTargetBatchSize" @@ -243,7 +242,6 @@ private[spark] object CosmosConfigNames { ClientTelemetryEnabled, ClientTelemetryEndpoint, WriteBulkEnabled, - WriteBulkTransactional, WriteBulkMaxPendingOperations, WriteBulkMaxConcurrentPartitions, WriteBulkPayloadSizeInBytes, @@ -1489,14 +1487,6 @@ private object CosmosWriteConfig { parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean, helpMessage = "Cosmos DB Item Write bulk enabled") - private val bulkTransactional = CosmosConfigEntry[Boolean](key = CosmosConfigNames.WriteBulkTransactional, - mandatory = false, - defaultValue = Option.apply(false), - parseFromStringFunction = bulkTransactionalAsString => bulkTransactionalAsString.toBoolean, - helpMessage = "Cosmos DB Item Write bulk transactional batch mode enabled - requires bulk write to be enabled. " + - "Spark 3.5+ provides automatic distribution/ordering for transactional batch. " + - "On Spark 3.3/3.4, transactional batch is supported but requires manual sorting for optimal performance.") - private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes, defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES), mandatory = false, @@ -1769,7 +1759,7 @@ private object CosmosWriteConfig { val itemWriteStrategyOpt = CosmosConfigEntry.parse(cfg, itemWriteStrategy) val maxRetryCountOpt = CosmosConfigEntry.parse(cfg, maxRetryCount) val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled) - val bulkTransactionalOpt = CosmosConfigEntry.parse(cfg, bulkTransactional) + val bulkTransactionalOpt = Some(false) var patchConfigsOpt = Option.empty[CosmosPatchConfigs] val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg) val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes) diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala deleted file mode 100644 index 3b9feae2fdbd..000000000000 --- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala +++ /dev/null @@ -1,1000 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.azure.cosmos.spark - -import com.azure.cosmos.implementation.{TestConfigurations, Utils} -import com.azure.cosmos.models.{PartitionKey, PartitionKeyBuilder} -import com.azure.cosmos.{CosmosAsyncClient, CosmosException} -import com.azure.cosmos.test.faultinjection._ -import com.fasterxml.jackson.databind.node.ObjectNode -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Row, SaveMode} - -import java.time.Duration - -import java.util.UUID -// scalastyle:off underscore.import -import scala.collection.JavaConverters._ -// scalastyle:on underscore.import - -class TransactionalBatchITest extends IntegrationSpec - with Spark - with AutoCleanableCosmosContainersWithPkAsPartitionKey { - - //scalastyle:off multiple.string.literals - //scalastyle:off magic.number - - // Helper method to get root cause of exception - private def getRootCause(t: Throwable): Throwable = { - if (t.getCause == null) t else getRootCause(t.getCause) - } - - "Transactional Batch" should "create items atomically" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val partitionKeyValue = UUID.randomUUID().toString - val item1Id = s"test-item1-${UUID.randomUUID()}" - val item2Id = s"test-item2-${UUID.randomUUID()}" - - // Create batch operations DataFrame with flat columns (like normal writes) - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("name", StringType, nullable = false) - )) - - val batchOperations = Seq( - Row(item1Id, partitionKeyValue, "Alice"), - Row(item2Id, partitionKeyValue, "Bob") - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // Execute transactional batch using bulk transactional mode - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify items were created - val item1 = container.readItem(item1Id, new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - item1 should not be null - item1.getItem.get("name").asText() shouldEqual "Alice" - - val item2 = container.readItem(item2Id, new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - item2 should not be null - item2.getItem.get("name").asText() shouldEqual "Bob" - } - - it should "rollback all operations on batch size limit failure" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val partitionKeyValue = UUID.randomUUID().toString - - // Create a batch that exceeds the 2MB size limit for transactional batches - // Each item is ~20KB, so 101 items will exceed the limit - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("largeField", StringType, nullable = false) - )) - - val largeString = "x" * 20000 // 20KB string - val batchOperations = (1 to 101).map { i => - Row(s"item-$i-${UUID.randomUUID()}", partitionKeyValue, largeString) - } - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // This should fail because batch exceeds size limit - val exception = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - } - - // Verify the exception indicates batch failure (either size or count limit) - val rootCause = getRootCause(exception) - assert(rootCause.getMessage.contains("more operations") || rootCause.getMessage.contains("size") || rootCause.getMessage.contains("limit"), - s"Expected batch limit error, got: ${rootCause.getMessage}") - - // Verify NO items were created (rolled back) - atomic rollback ensures nothing persisted - val itemsCreated = try { - val countList = container.queryItems( - s"SELECT VALUE COUNT(1) FROM c WHERE c.pk = '$partitionKeyValue'", - classOf[Long] - ).collectList().block() - - if (countList.isEmpty) { - 0 - } else { - countList.get(0).toInt - } - } catch { - case _: Exception => 0 - } - assert(itemsCreated == 0, s"No items should exist due to batch rollback, but found $itemsCreated") - } - - it should "rollback all operations when one has blank ID" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val partitionKeyValue = "rollback-blank-id-test" - - // Create a batch where the 5th item has a blank ID (validation error) - val batchOperations = Seq( - ("id1", partitionKeyValue, "value1"), - ("id2", partitionKeyValue, "value2"), - ("id3", partitionKeyValue, "value3"), - ("id4", partitionKeyValue, "value4"), - ("", partitionKeyValue, "blank-id"), // Blank ID will cause error - ("id6", partitionKeyValue, "value6") - ) - - val schema = StructType(Seq( - StructField("id", StringType), - StructField("pk", StringType), - StructField("value", StringType) - )) - - val operationsDf = spark.createDataFrame( - batchOperations.map(op => Row(op._1, op._2, op._3)).asJava, - schema - ) - - // Should throw exception due to blank ID - val exception = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .mode(SaveMode.Append) - .save() - } - - // Verify the first item (id1) was NOT created - proving rollback worked - val cosmosContainer = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val queryResult = cosmosContainer - .queryItems(s"SELECT * FROM c WHERE c.pk = '$partitionKeyValue'", classOf[ObjectNode]) - .collectList() - .block() - - // Should have zero items - all operations rolled back - queryResult.size() shouldBe 0 - } - - it should "reject unsupported write strategies" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val partitionKeyValue = UUID.randomUUID().toString - val item1Id = s"test-item1-${UUID.randomUUID()}" - - // Create DataFrame with simple schema - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("name", StringType, nullable = false) - )) - - val batchOperations = Seq( - Row(item1Id, partitionKeyValue, "TestItem") - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // Test ItemAppend (create) - should fail - val appendException = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.strategy", "ItemAppend") - .mode(SaveMode.Append) - .save() - } - val appendRootCause = getRootCause(appendException) - assert(appendRootCause.getMessage.contains("Transactional batches only support ItemOverwrite"), - s"Expected ItemAppend rejection, got: ${appendRootCause.getMessage}") - - // Test ItemDelete - should fail - val deleteException = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.strategy", "ItemDelete") - .mode(SaveMode.Append) - .save() - } - val deleteRootCause = getRootCause(deleteException) - assert(deleteRootCause.getMessage.contains("Transactional batches only support ItemOverwrite"), - s"Expected ItemDelete rejection, got: ${deleteRootCause.getMessage}") - - // Test ItemOverwriteIfNotModified - should fail - val replaceException = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.strategy", "ItemOverwriteIfNotModified") - .mode(SaveMode.Append) - .save() - } - val replaceRootCause = getRootCause(replaceException) - assert(replaceRootCause.getMessage.contains("Transactional batches only support ItemOverwrite"), - s"Expected ItemOverwriteIfNotModified rejection, got: ${replaceRootCause.getMessage}") - } - - it should "support simplified schema with default upsert operation" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val partitionKeyValue = UUID.randomUUID().toString - val item1Id = s"test-item1-${UUID.randomUUID()}" - val item2Id = s"test-item2-${UUID.randomUUID()}" - - // Create batch operations DataFrame with simplified schema - val simplifiedSchema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("name", StringType, nullable = false), - StructField("age", IntegerType, nullable = false) - )) - - val batchOperations = Seq( - Row(item1Id, partitionKeyValue, "Alice", 30), - Row(item2Id, partitionKeyValue, "Bob", 25) - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, simplifiedSchema) - - // Execute transactional batch - defaults to ItemOverwrite (upsert) - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify items were created - val item1 = container.readItem(item1Id, new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - item1 should not be null - item1.getItem.get("name").asText() shouldEqual "Alice" - item1.getItem.get("age").asInt() shouldEqual 30 - - val item2 = container.readItem(item2Id, new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - item2 should not be null - item2.getItem.get("name").asText() shouldEqual "Bob" - item2.getItem.get("age").asInt() shouldEqual 25 - } - - it should "preserve order with simplified schema" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val partitionKeyValue = UUID.randomUUID().toString - val baseId = s"order-test-${UUID.randomUUID()}" - - // Create batch with multiple operations in specific order - val simplifiedSchema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("sequence", IntegerType, nullable = false) - )) - - val batchOperations = (1 to 10).map { i => - Row(s"$baseId-$i", partitionKeyValue, i) - } - - val operationsDf = spark.createDataFrame(batchOperations.asJava, simplifiedSchema) - - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify items were created in order - (1 to 10).foreach { i => - val item = container.readItem(s"$baseId-$i", new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - item should not be null - item.getItem.get("sequence").asInt() shouldEqual i - } - } - - it should "support update operations with simplified schema" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - val partitionKeyValue = UUID.randomUUID().toString - val itemId = s"test-item-${UUID.randomUUID()}" - - // First create an item - val initialDoc = Utils.getSimpleObjectMapper.createObjectNode() - initialDoc.put("id", itemId) - initialDoc.put("pk", partitionKeyValue) - initialDoc.put("name", "InitialName") - initialDoc.put("version", 1) - container.createItem(initialDoc).block() - - // Now update it using transactional batch (defaults to upsert) - val simplifiedSchema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("name", StringType, nullable = false), - StructField("version", IntegerType, nullable = false) - )) - - val updateOperations = Seq( - Row(itemId, partitionKeyValue, "UpdatedName", 2) - ) - - val operationsDf = spark.createDataFrame(updateOperations.asJava, simplifiedSchema) - - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify item was updated - val updatedItem = container.readItem(itemId, new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - updatedItem should not be null - updatedItem.getItem.get("name").asText() shouldEqual "UpdatedName" - updatedItem.getItem.get("version").asInt() shouldEqual 2 - } - - it should "fail when more than 100 operations for a single partition key" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - val partitionKeyValue = UUID.randomUUID().toString - - // Create 101 operations for the same partition key - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("name", StringType, nullable = false) - )) - - val batchOperations = (1 to 101).map { i => - Row(s"item-$i-${UUID.randomUUID()}", partitionKeyValue, s"Name-$i") - } - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // Should throw exception due to exceeding 100 operations per partition key limit - val exception = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - } - - // Spark wraps our exception in "Writing job aborted", check the root cause - val rootCause = getRootCause(exception) - // Cosmos DB rejects batches with > 100 operations - rootCause.getMessage should include("Batch request has more operations than what is supported") - } - - "Transactional Batch with Hierarchical Partition Keys" should "create items atomically with testPrimaryKey and testSecondaryKey" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - // Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey) - val containerName = s"test-hpk-${UUID.randomUUID()}" - val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties( - containerName, - new com.azure.cosmos.models.PartitionKeyDefinition() - ) - val paths = new java.util.ArrayList[String]() - paths.add("/testPrimaryKey") - paths.add("/testSecondaryKey") - containerProperties.getPartitionKeyDefinition.setPaths(paths) - containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH) - containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2) - cosmosClient.getDatabase(cosmosDatabase).createContainer(containerProperties).block() - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName) - - try { - val permId = "MSFT" - val sourceId = "Bloomberg" - val item1Id = s"${UUID.randomUUID()}" - val item2Id = s"${UUID.randomUUID()}" - - // Create batch operations with hierarchical partition keys - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("testPrimaryKey", StringType, nullable = false), - StructField("testSecondaryKey", StringType, nullable = false), - StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false) - )) - - val batchOperations = Seq( - Row(item1Id, permId, sourceId, 100.5), - Row(item2Id, permId, sourceId, 101.25) - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", containerName) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify items were created - val pk = new PartitionKeyBuilder().add(permId).add(sourceId).build() - val item1 = container.readItem(item1Id, pk, classOf[ObjectNode]).block() - item1 should not be null - item1.getItem.get("testPrice").asDouble() shouldEqual 100.5 - - val item2 = container.readItem(item2Id, pk, classOf[ObjectNode]).block() - item2 should not be null - item2.getItem.get("testPrice").asDouble() shouldEqual 101.25 - } finally { - // Clean up container - container.delete().block() - } - } - - it should "handle temporal updates for financial instrument timelines" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - // Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey) - val containerName = s"test-hpk-temporal-${UUID.randomUUID()}" - val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties( - containerName, - new com.azure.cosmos.models.PartitionKeyDefinition() - ) - val paths = new java.util.ArrayList[String]() - paths.add("/testPrimaryKey") - paths.add("/testSecondaryKey") - containerProperties.getPartitionKeyDefinition.setPaths(paths) - containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH) - containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2) - cosmosClient.getDatabase(cosmosDatabase).createContainer(containerProperties).block() - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName) - - try { - val permId = "MSFT" - val sourceId = "Bloomberg" - val oldRecordId = "2024-01-01T00:00:00Z" - val newRecordId = "2024-06-01T00:00:00Z" - - // First, create initial record - val initialDoc = Utils.getSimpleObjectMapper.createObjectNode() - initialDoc.put("id", oldRecordId) - initialDoc.put("testPrimaryKey", permId) - initialDoc.put("testSecondaryKey", sourceId) - initialDoc.put("testPrice", 100.0) - initialDoc.putNull("valid_to") - container.createItem(initialDoc).block() - - // Now perform atomic temporal update: close old record + create new record - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("testPrimaryKey", StringType, nullable = false), - StructField("testSecondaryKey", StringType, nullable = false), - StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false), - StructField("valid_to", StringType, nullable = true) - )) - - val batchOperations = Seq( - // Close old record by setting valid_to (using upsert to replace) - Row(oldRecordId, permId, sourceId, 100.0, "2024-06-01T00:00:00Z"), - // Create new record with new price - Row(newRecordId, permId, sourceId, 150.0, null) - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", containerName) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify old record was closed - val pk = new PartitionKeyBuilder().add(permId).add(sourceId).build() - val oldRecord = container.readItem(oldRecordId, pk, classOf[ObjectNode]).block() - oldRecord should not be null - oldRecord.getItem.get("valid_to").asText() shouldEqual "2024-06-01T00:00:00Z" - - // Verify new record was created - val newRecord = container.readItem(newRecordId, pk, classOf[ObjectNode]).block() - newRecord should not be null - newRecord.getItem.get("testPrice").asDouble() shouldEqual 150.0 - newRecord.getItem.get("valid_to").isNull shouldBe true - } finally { - // Clean up container - container.delete().block() - } - } - - it should "handle operations across multiple testPrimaryKey/testSecondaryKey combinations" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - // Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey) - val containerName = s"test-hpk-multi-${UUID.randomUUID()}" - val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties( - containerName, - new com.azure.cosmos.models.PartitionKeyDefinition() - ) - val paths = new java.util.ArrayList[String]() - paths.add("/testPrimaryKey") - paths.add("/testSecondaryKey") - containerProperties.getPartitionKeyDefinition.setPaths(paths) - containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH) - containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2) - cosmosClient.getDatabase(cosmosDatabase).createContainer(containerProperties).block() - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName) - - try { - // Create operations for different PermId/SourceId combinations - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("testPrimaryKey", StringType, nullable = false), - StructField("testSecondaryKey", StringType, nullable = false), - StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false) - )) - - val batchOperations = Seq( - // MSFT from Bloomberg - Row(s"${UUID.randomUUID()}", "MSFT", "Bloomberg", 100.0), - Row(s"${UUID.randomUUID()}", "MSFT", "Bloomberg", 101.0), - // MSFT from Reuters - Row(s"${UUID.randomUUID()}", "MSFT", "Reuters", 100.5), - // AAPL from Bloomberg - Row(s"${UUID.randomUUID()}", "AAPL", "Bloomberg", 150.0) - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", containerName) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // All operations should succeed since they're across different partition key combinations - // Each unique PermId/SourceId combination is treated as a separate transactional batch - } finally { - // Clean up container - container.delete().block() - } - } - - it should "fail when more than 100 operations for a single hierarchical partition key" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - // Create container with hierarchical partition keys (testPrimaryKey, testSecondaryKey) - val containerName = s"test-hpk-limit-${UUID.randomUUID()}" - val containerProperties = new com.azure.cosmos.models.CosmosContainerProperties( - containerName, - new com.azure.cosmos.models.PartitionKeyDefinition() - ) - val paths = new java.util.ArrayList[String]() - paths.add("/testPrimaryKey") - paths.add("/testSecondaryKey") - containerProperties.getPartitionKeyDefinition.setPaths(paths) - containerProperties.getPartitionKeyDefinition.setKind(com.azure.cosmos.models.PartitionKind.MULTI_HASH) - containerProperties.getPartitionKeyDefinition.setVersion(com.azure.cosmos.models.PartitionKeyDefinitionVersion.V2) - cosmosClient.getDatabase(cosmosDatabase).createContainer(containerProperties).block() - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(containerName) - - val permId = "MSFT" - val sourceId = "Bloomberg" - - // Create 101 operations for the same hierarchical partition key - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("testPrimaryKey", StringType, nullable = false), - StructField("testSecondaryKey", StringType, nullable = false), - StructField("testPrice", org.apache.spark.sql.types.DoubleType, nullable = false) - )) - - val batchOperations = (1 to 101).map { i => - Row(s"${UUID.randomUUID()}", permId, sourceId, 100.0 + i) - } - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // Should throw exception due to exceeding 100 operations per partition key limit - val exception = intercept[Exception] { - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", containerName) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - } - - // Spark wraps our exception in "Writing job aborted", check the root cause - val rootCause = getRootCause(exception) - // Cosmos DB rejects batches with > 100 operations - rootCause.getMessage should include("Batch request has more operations than what is supported") - - // Clean up container after assertions pass - try { - container.delete().block() - } catch { - case e: Exception => // Ignore cleanup failures - } - } - - it should "handle multiple partition keys with repartitioning" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - - // Create operations for multiple partition keys intentionally in random order - val pk1 = UUID.randomUUID().toString - val pk2 = UUID.randomUUID().toString - val pk3 = UUID.randomUUID().toString - - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("value", StringType, nullable = false) - )) - - // Interleave operations for different partition keys - // With repartitioning, operations should be grouped by partition key - val batchOperations = Seq( - Row(s"${pk1}_1", pk1, "value1"), - Row(s"${pk2}_1", pk2, "value1"), - Row(s"${pk3}_1", pk3, "value1"), - Row(s"${pk1}_2", pk1, "value2"), - Row(s"${pk2}_2", pk2, "value2"), - Row(s"${pk3}_2", pk3, "value2"), - Row(s"${pk1}_3", pk1, "value3"), - Row(s"${pk2}_3", pk2, "value3"), - Row(s"${pk3}_3", pk3, "value3") - ) - - val operationsDf = spark.createDataFrame(batchOperations.asJava, schema) - - // Execute transactional batch - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Verify all operations succeeded - Seq(pk1, pk2, pk3).foreach { pk => - (1 to 3).foreach { i => - val item = container.readItem(s"${pk}_$i", new PartitionKey(pk), classOf[ObjectNode]).block() - item should not be null - item.getItem.get("value").asText() shouldEqual s"value$i" - } - } - } - - it should "enforce batch-level backpressure with small maxPendingOperations" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - - // Use a small maxPendingOperations value to force batch-level limiting - // With maxPendingOperations=50, maxPendingBatches = 50/50 = 1 - // This means only 1 batch should be in-flight at a time - val maxPendingOperations = 50 - - // Create 200 operations across 4 batches (50 operations per partition key = 1 batch each) - // This will test that the semaphore properly limits concurrent batches - val partitionKeys = (1 to 4).map(_ => UUID.randomUUID().toString) - - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("value", IntegerType, nullable = false) - )) - - // Create 50 operations per partition key (forms 1 batch per PK) - val allOperations = partitionKeys.flatMap { pk => - (1 to 50).map { i => - Row(s"item-$i-${UUID.randomUUID()}", pk, i) - } - } - - val operationsDf = spark.createDataFrame(allOperations.asJava, schema) - - // Execute with very small maxPendingOperations to force batch limiting - operationsDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.bulk.maxPendingOperations", maxPendingOperations.toString) - .mode(SaveMode.Append) - .save() - - // Verify all items were created successfully - partitionKeys.foreach { pk => - val queryResult = container - .queryItems(s"SELECT VALUE COUNT(1) FROM c WHERE c.pk = '$pk'", classOf[Long]) - .collectList() - .block() - - val count = if (queryResult.isEmpty) 0L else queryResult.get(0) - assert(count == 50, s"Expected 50 items for partition key $pk, but found $count") - } - - // If we get here without deadlock or timeout, batch-level backpressure is working - // The test verifies: - // 1. Operations complete successfully even with tight batch limit - // 2. No deadlocks occur from semaphore management - // 3. All batches are properly tracked and released - } - - "Transactional Batch" should "handle multiple batches for same partition key" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - - // Create two sets of operations for the same partition key - // This tests that multiple successful batches can be written to the same partition key - val partitionKeyValue = UUID.randomUUID().toString - - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("counter", IntegerType, nullable = false) - )) - - // First, write initial items that will later be updated transactionally - val initialItems = (1 to 10).map { i => - Row(s"item-$i", partitionKeyValue, 0) - } - - val initialDf = spark.createDataFrame(initialItems.asJava, schema) - initialDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // Now update items 1-5 to counter=1, then items 6-10 to counter=2 - // Both are atomic batches for the same partition key - // If retries from the first batch interleave with the second batch, - // atomicity would be violated - val batch1 = (1 to 5).map { i => - Row(s"item-$i", partitionKeyValue, 1) - } - - val batch2 = (6 to 10).map { i => - Row(s"item-$i", partitionKeyValue, 2) - } - - val allUpdates = batch1 ++ batch2 - val updatesDf = spark.createDataFrame(allUpdates.asJava, schema) - - // Delete existing items first since Overwrite mode is not supported in transactional mode - (1 to 10).foreach { i => - container.deleteItem(s"item-$i", new PartitionKey(partitionKeyValue), null).block() - } - - updatesDf.write - .format("cosmos.oltp") - .option("spark.cosmos.accountEndpoint", cosmosEndpoint) - .option("spark.cosmos.accountKey", cosmosMasterKey) - .option("spark.cosmos.database", cosmosDatabase) - .option("spark.cosmos.container", cosmosContainersWithPkAsPartitionKey) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.bulk.maxPendingOperations", "10") // Force separate batches - .mode(SaveMode.Append) - .save() - - // Verify the final state: all items should have their expected counter values - // If interleaving occurred, some updates might have been lost or inconsistent - (1 to 5).foreach { i => - val item = container.readItem( - s"item-$i", - new PartitionKey(partitionKeyValue), - classOf[ObjectNode] - ).block() - - assert(item != null, s"Item item-$i should exist") - assert(item.getItem.get("counter").asInt() == 1, - s"Item item-$i should have counter=1, but got ${item.getItem.get("counter").asInt()}") - } - - (6 to 10).foreach { i => - val item = container.readItem( - s"item-$i", - new PartitionKey(partitionKeyValue), - classOf[ObjectNode] - ).block() - - assert(item != null, s"Item item-$i should exist") - assert(item.getItem.get("counter").asInt() == 2, - s"Item item-$i should have counter=2, but got ${item.getItem.get("counter").asInt()}") - } - } - - it should "handle batch-level retries for retriable errors without interleaving" in { - val cosmosEndpoint = TestConfigurations.HOST - val cosmosMasterKey = TestConfigurations.MASTER_KEY - - val partitionKeyValue = UUID.randomUUID().toString - - val schema = StructType(Seq( - StructField("id", StringType, nullable = false), - StructField("pk", StringType, nullable = false), - StructField("counter", IntegerType, nullable = false) - )) - - // Configuration for Spark connector - must match exactly for cache lookup - val cfg = Map( - "spark.cosmos.accountEndpoint" -> cosmosEndpoint, - "spark.cosmos.accountKey" -> cosmosMasterKey, - "spark.cosmos.database" -> cosmosDatabase, - "spark.cosmos.container" -> cosmosContainersWithPkAsPartitionKey - ) - - // Create initial items with counter = 0 - // This FIRST write ensures Spark creates the client and caches it - val initialItems = (1 to 10).map { i => - Row(s"item-$i", partitionKeyValue, 0) - } - - val initialDf = spark.createDataFrame(initialItems.asJava, schema) - initialDf.write - .format("cosmos.oltp") - .options(cfg) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .mode(SaveMode.Append) - .save() - - // NOW get the actual client that Spark created and cached - val clientFromCache = udf.CosmosAsyncClientCache - .getCosmosClientFromCache(cfg) - .getClient - .asInstanceOf[CosmosAsyncClient] - - val container = clientFromCache.getDatabase(cosmosDatabase).getContainer(cosmosContainersWithPkAsPartitionKey) - - // Configure fault injection to inject retriable 429 TOO_MANY_REQUEST errors on BATCH_ITEM operations - // 429 errors are retriable and should trigger batch-level retry without aborting the job - // This tests that transactional batches handle retries at the batch level - val faultInjectionRule = new FaultInjectionRuleBuilder("batch-retry-interleaving-test-" + UUID.randomUUID()) - .condition( - new FaultInjectionConditionBuilder() - .operationType(FaultInjectionOperationType.BATCH_ITEM) - .build() - ) - .result( - FaultInjectionResultBuilders - .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) - .times(1) // Inject 429 on first batch operation, then allow retries to succeed - .build() - ) - .duration(Duration.ofMinutes(5)) - .build() - - // Configure the fault injection rule on the container - CosmosFaultInjectionHelper.configureFaultInjectionRules(container, java.util.Collections.singletonList(faultInjectionRule)).block() - - try { - // Now write just ONE more item using NEW ID to avoid conflicts with the initial write - // This is the absolute simplest test case to verify batch-level retry handling - // With maxPendingOperations=1, this single item becomes its own batch - val singleItem = Seq(Row("item-11", partitionKeyValue, 1)) - - val updatesDf = spark.createDataFrame(singleItem.asJava, schema) - - updatesDf.write - .format("cosmos.oltp") - .options(cfg) - .option("spark.cosmos.write.bulk.transactional", "true") - .option("spark.cosmos.write.bulk.enabled", "true") - .option("spark.cosmos.write.bulk.maxPendingOperations", "1") // Ensure minimal batch size - .mode(SaveMode.Append) - .save() - - // Verify that fault injection triggered and was retried successfully - // This confirms that retriable errors (429) trigger batch-level retries - val hitCount = faultInjectionRule.getHitCount - assert(hitCount > 0, s"Fault injection should have tracked BATCH_ITEM operations, but hit count was $hitCount") - - // Verify the single item was written correctly despite the injected 429 error and retry - val item11 = container.readItem("item-11", new PartitionKey(partitionKeyValue), classOf[ObjectNode]).block() - assert(item11 != null, "Item item-11 should exist") - assert(item11.getItem.get("counter").asInt() == 1, - s"Item item-11 should have counter=1, but got ${item11.getItem.get("counter").asInt()}") - } finally { - // Clean up: disable the fault injection rule - faultInjectionRule.disable() - } - } - -}