Skip to content

Core, Spark: Clean up uncommitted files when a staged table is aborted#16388

Open
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:spark-staged-table-abort-cleanup
Open

Core, Spark: Clean up uncommitted files when a staged table is aborted#16388
wombatu-kun wants to merge 1 commit into
apache:mainfrom
wombatu-kun:spark-staged-table-abort-cleanup

Conversation

@wombatu-kun
Copy link
Copy Markdown
Contributor

Summary

StagedSparkTable.abortStagedChanges() was an empty // TODO: clean up. Spark calls it to roll back an atomic CTAS/RTAS, and it is also used by the snapshot/migrate actions. Iceberg's internal transaction cleanup only runs when commitTransaction() is actually invoked and then fails. When a failure happens after the staged write but before commitStagedChanges() is called (for example, SnapshotTableSparkAction/MigrateTableSparkAction failing while importing data, or an interrupted CTAS), nothing cleaned the manifest list and manifests already written into the uncommitted transaction. For a staged CREATE the table is never registered, so those orphans have no table metadata pointing at them and are unreachable by removeOrphanFiles — they leak permanently.

Changes

  • Add a best-effort default void abortTransaction() to the Transaction API. The default is a documented no-op, so existing implementations are unaffected.
  • Override it in BaseTransaction to run the existing cleanUp() (cleanAllUpdates() + deleteUncommittedFiles()) — the same cleanup Iceberg already performs when a create/replace transaction's own commit fails.
  • Delegate it in CommitCallbackTransaction; the post-commit callback is intentionally not run on abort.
  • Wire StagedSparkTable.abortStagedChanges() to transaction.abortTransaction() in all supported Spark versions (3.4, 3.5, 4.0, 4.1).
  • Add the corresponding java.method.addedToInterface entry to .palantir/revapi.yml.

A no-op default (rather than throwing) is used because abortTransaction() runs from catch/finally blocks where a secondary exception would mask the original failure. The underlying deletion is already best-effort and idempotent (CatalogUtil.deleteFile swallows NotFoundException, cleanAllUpdates() suppresses failures), so calling abort after a failed commitStagedChanges() is safe.

Out of scope

Executor-written data files in the write-succeeds-then-commit-fails path are not deleted here. That matches Iceberg's existing create-transaction behavior, and those files are handled by SparkWrite.abort() on write-job failure.

Testing

  • New abort case in core TestCreateTransaction (runs across all format-version templates): stages a create transaction, performs an append, asserts the manifest and manifest list exist, calls abortTransaction(), asserts they are removed and the table is not created, and verifies a second abortTransaction() does not throw.
  • New byte-identical TestStagedSparkTable added to all four Spark version trees: stages a CREATE via SparkCatalog.stageCreate, routes an append into the staged transaction, calls abortStagedChanges(), and asserts the uncommitted manifest/manifest-list files are deleted and the table is not created. The session-catalog parameterization is skipped (it produces RollbackStagedTable, not StagedSparkTable). Per Spark version: 3 configs (hive/hadoop/rest) pass, 1 skipped, 0 failures.
  • ./gradlew revApiCheck, spotlessApply -DallModules, and the tests above all pass.

🤖 Generated with Claude Code

StagedSparkTable.abortStagedChanges() was an empty `// TODO: clean up`, so when an atomic CTAS/RTAS (or the snapshot/migrate actions) failed after the staged write but before commitStagedChanges(), the manifest list and manifests already written into the uncommitted transaction were leaked. For a staged CREATE the table is never registered, so those orphans are unreachable by removeOrphanFiles and leak permanently.

This adds a best-effort `default void abortTransaction()` to the Transaction API, overridden in BaseTransaction to run the existing cleanUp() (cleanAllUpdates() + deleteUncommittedFiles()) and delegated by CommitCallbackTransaction. StagedSparkTable.abortStagedChanges() now calls it across all supported Spark versions (3.4, 3.5, 4.0, 4.1). The default is a no-op rather than throwing because abort runs from catch/finally blocks where a secondary exception would mask the original failure; the underlying deletion is already best-effort and idempotent, so calling abort after a failed commit is safe.

Out of scope (unchanged, pre-existing): executor-written data files in the write-succeeds-then-commit-fails path, which match Iceberg's existing create-transaction behavior and are handled by SparkWrite.abort() on write-job failure.

Tests: a new abort case in core TestCreateTransaction, and a byte-identical TestStagedSparkTable added to all four Spark version trees.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant