Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. **Note**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |
Expand Down
69 changes: 0 additions & 69 deletions sdk/cosmos/azure-cosmos-spark_3/docs/scenarios/Ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,75 +79,6 @@ When your container has a "unique key constraint policy" any 409 "Conflict" (ind
- For `ItemOverwrite` a 409 - Conflict due to unique key violation will result in an error - and the Spark job will fail. *NOTE: Conflicts due to pk+id being identical to another document won't even result in a 409 - because with Upsert the existing document would simply be updated.*
- For `ItemAppend` like conflicts on pk+id any unique key policy constraint violation will be ignored.

### Transactional batch writes

For scenarios requiring atomic all-or-nothing semantics within a partition, you can enable transactional batch writes using the `spark.cosmos.write.bulk.transactional` configuration. When enabled, all operations for a single partition key value either succeed or fail together.

#### Configuration

**Python example:**
```python
df.write \
.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", cosmosEndpoint) \
.option("spark.cosmos.accountKey", cosmosKey) \
.option("spark.cosmos.database", cosmosDatabase) \
.option("spark.cosmos.container", cosmosContainer) \
.option("spark.cosmos.write.bulk.enabled", "true") \
.option("spark.cosmos.write.bulk.transactional", "true") \
.mode("Append") \
.save()
```

**Scala example:**
```scala
df.write
.format("cosmos.oltp")
.option("spark.cosmos.accountEndpoint", cosmosEndpoint)
.option("spark.cosmos.accountKey", cosmosKey)
.option("spark.cosmos.database", cosmosDatabase)
.option("spark.cosmos.container", cosmosContainer)
.option("spark.cosmos.write.bulk.enabled", "true")
.option("spark.cosmos.write.bulk.transactional", "true")
.mode(SaveMode.Append)
.save()
```

#### Characteristics and limitations

- **Atomic semantics**: All operations for the same partition key succeed or all fail (rollback)
- **Operation type**: Only upsert operations are supported (equivalent to `ItemOverwrite` write strategy)
- **Partition grouping**: Spark automatically partitions and orders data by partition key columns
- **Size limits**: Maximum 100 operations per transaction; maximum 2MB total payload per transaction. These limits are **enforced service-side by Azure Cosmos DB** and cannot be configured client-side.
- **Partition key requirement**: All operations in a transaction must share the same partition key value
- **Hierarchical partition keys**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys
- **Bulk mode required**: Must have `spark.cosmos.write.bulk.enabled=true` (enabled by default)

#### Use cases

Transactional batch writes are ideal for:
- Financial transactions requiring consistency across multiple documents (e.g., double-entry bookkeeping, account balance updates)
- Order processing where order header and line items must be committed together
- Multi-document updates that must be atomic (e.g., inventory adjustments)
- **Temporal data scenarios** where historical versioning must be consistent (e.g., financial instrument versioning, audit trails, snapshot consistency)
- Any scenario where partial success would leave data in an inconsistent state

#### Error handling

Errors during transactional batch execution fall into two categories:

**Transient errors** (automatically retried):
- Insufficient RUs (429 throttling) - the entire transaction is retried according to the SDK's retry policy
- Temporary network issues or service unavailability - automatic retry with exponential backoff

**Non-transient errors** (Spark job fails):
- Transaction size limit exceeded (> 100 operations or > 2MB payload)
- Document validation errors (e.g., blank ID, schema violations)
- Unique key constraint violations
- Any service-side rejection that cannot be resolved by retry

When any operation in a transaction fails with a non-transient error, the entire transaction is rolled back and no documents are modified. The Spark task will fail and may retry according to Spark's retry policy.

## Preparation
Below are a couple of tips/best-practices that can help you to prepare for a data migration into a Cosmos DB container.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private[spark] object CosmosConfigNames {
val ClientTelemetryEnabled = "spark.cosmos.clientTelemetry.enabled" // keep this to avoid breaking changes
val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint" // keep this to avoid breaking changes
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
val WriteBulkTransactional = "spark.cosmos.write.bulk.transactional"
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize"
val WriteBulkMinTargetBatchSize = "spark.cosmos.write.bulk.minTargetBatchSize"
Expand Down Expand Up @@ -243,7 +242,6 @@ private[spark] object CosmosConfigNames {
ClientTelemetryEnabled,
ClientTelemetryEndpoint,
WriteBulkEnabled,
WriteBulkTransactional,
WriteBulkMaxPendingOperations,
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
Expand Down Expand Up @@ -1489,14 +1487,6 @@ private object CosmosWriteConfig {
parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean,
helpMessage = "Cosmos DB Item Write bulk enabled")

private val bulkTransactional = CosmosConfigEntry[Boolean](key = CosmosConfigNames.WriteBulkTransactional,
mandatory = false,
defaultValue = Option.apply(false),
parseFromStringFunction = bulkTransactionalAsString => bulkTransactionalAsString.toBoolean,
helpMessage = "Cosmos DB Item Write bulk transactional batch mode enabled - requires bulk write to be enabled. " +
"Spark 3.5+ provides automatic distribution/ordering for transactional batch. " +
"On Spark 3.3/3.4, transactional batch is supported but requires manual sorting for optimal performance.")

private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes,
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES),
mandatory = false,
Expand Down Expand Up @@ -1769,7 +1759,7 @@ private object CosmosWriteConfig {
val itemWriteStrategyOpt = CosmosConfigEntry.parse(cfg, itemWriteStrategy)
val maxRetryCountOpt = CosmosConfigEntry.parse(cfg, maxRetryCount)
val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled)
val bulkTransactionalOpt = CosmosConfigEntry.parse(cfg, bulkTransactional)
val bulkTransactionalOpt = Some(false)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
Expand Down
Loading